http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index effa498..4abcdf4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -27,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -59,7 +61,6 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
@@ -105,7 +106,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * TaskExecutor implementation. The task executor is responsible for the 
execution of multiple
  * {@link Task}.
  */
-public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
+public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
        public static final String TASK_MANAGER_NAME = "taskmanager";
 
@@ -288,48 +289,51 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        // Task lifecycle RPCs
        // 
----------------------------------------------------------------------
 
-       @RpcMethod
-       public Acknowledge submitTask(TaskDeploymentDescriptor tdd, UUID 
jobManagerLeaderId) throws TaskSubmissionException {
+       @Override
+       public CompletableFuture<Acknowledge> submitTask(
+                       TaskDeploymentDescriptor tdd,
+                       UUID jobManagerLeaderId,
+                       Time timeout) {
 
-               // first, deserialize the pre-serialized information
-               final JobInformation jobInformation;
-               final TaskInformation taskInformation;
                try {
-                       jobInformation = 
tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
-                       taskInformation = 
tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
-               }
-               catch (IOException | ClassNotFoundException e) {
-                       throw new TaskSubmissionException("Could not 
deserialize the job or task information.", e);
-               }
+                       // first, deserialize the pre-serialized information
+                       final JobInformation jobInformation;
+                       final TaskInformation taskInformation;
+                       try {
+                               jobInformation = 
tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
+                               taskInformation = 
tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
+                       } catch (IOException | ClassNotFoundException e) {
+                               throw new TaskSubmissionException("Could not 
deserialize the job or task information.", e);
+                       }
 
-               final JobID jobId = jobInformation.getJobId();
-               final JobManagerConnection jobManagerConnection = 
jobManagerTable.get(jobId);
+                       final JobID jobId = jobInformation.getJobId();
+                       final JobManagerConnection jobManagerConnection = 
jobManagerTable.get(jobId);
 
-               if (jobManagerConnection == null) {
-                       final String message = "Could not submit task because 
there is no JobManager " +
-                               "associated for the job " + jobId + '.';
+                       if (jobManagerConnection == null) {
+                               final String message = "Could not submit task 
because there is no JobManager " +
+                                       "associated for the job " + jobId + '.';
 
-                       log.debug(message);
-                       throw new TaskSubmissionException(message);
-               }
+                               log.debug(message);
+                               throw new TaskSubmissionException(message);
+                       }
 
-               if (!Objects.equals(jobManagerConnection.getLeaderId(), 
jobManagerLeaderId)) {
-                       final String message = "Rejecting the task submission 
because the job manager leader id " +
-                               jobManagerLeaderId + " does not match the 
expected job manager leader id " +
-                               jobManagerConnection.getLeaderId() + '.';
+                       if (!Objects.equals(jobManagerConnection.getLeaderId(), 
jobManagerLeaderId)) {
+                               final String message = "Rejecting the task 
submission because the job manager leader id " +
+                                       jobManagerLeaderId + " does not match 
the expected job manager leader id " +
+                                       jobManagerConnection.getLeaderId() + 
'.';
 
-                       log.debug(message);
-                       throw new TaskSubmissionException(message);
-               }
+                               log.debug(message);
+                               throw new TaskSubmissionException(message);
+                       }
 
-               if (!taskSlotTable.existsActiveSlot(jobId, 
tdd.getAllocationId())) {
-                       final String message = "No task slot allocated for job 
ID " + jobId +
-                               " and allocation ID " + tdd.getAllocationId() + 
'.';
-                       log.debug(message);
-                       throw new TaskSubmissionException(message);
-               }
+                       if (!taskSlotTable.existsActiveSlot(jobId, 
tdd.getAllocationId())) {
+                               final String message = "No task slot allocated 
for job ID " + jobId +
+                                       " and allocation ID " + 
tdd.getAllocationId() + '.';
+                               log.debug(message);
+                               throw new TaskSubmissionException(message);
+                       }
 
-               TaskMetricGroup taskMetricGroup = 
taskManagerMetricGroup.addTaskForJob(
+                       TaskMetricGroup taskMetricGroup = 
taskManagerMetricGroup.addTaskForJob(
                                jobInformation.getJobId(),
                                jobInformation.getJobName(),
                                taskInformation.getJobVertexId(),
@@ -338,7 +342,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                tdd.getSubtaskIndex(),
                                tdd.getAttemptNumber());
 
-               InputSplitProvider inputSplitProvider = new 
RpcInputSplitProvider(
+                       InputSplitProvider inputSplitProvider = new 
RpcInputSplitProvider(
                                jobManagerConnection.getLeaderId(),
                                jobManagerConnection.getJobManagerGateway(),
                                jobInformation.getJobId(),
@@ -346,96 +350,100 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                tdd.getExecutionAttemptId(),
                                taskManagerConfiguration.getTimeout());
 
-               TaskManagerActions taskManagerActions = 
jobManagerConnection.getTaskManagerActions();
-               CheckpointResponder checkpointResponder = 
jobManagerConnection.getCheckpointResponder();
-               LibraryCacheManager libraryCache = 
jobManagerConnection.getLibraryCacheManager();
-               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = 
jobManagerConnection.getResultPartitionConsumableNotifier();
-               PartitionProducerStateChecker partitionStateChecker = 
jobManagerConnection.getPartitionStateChecker();
-
-               Task task = new Task(
-                       jobInformation,
-                       taskInformation,
-                       tdd.getExecutionAttemptId(),
-                       tdd.getAllocationId(),
-                       tdd.getSubtaskIndex(),
-                       tdd.getAttemptNumber(),
-                       tdd.getProducedPartitions(),
-                       tdd.getInputGates(),
-                       tdd.getTargetSlotNumber(),
-                       tdd.getTaskStateHandles(),
-                       memoryManager,
-                       ioManager,
-                       networkEnvironment,
-                       broadcastVariableManager,
-                       taskManagerActions,
-                       inputSplitProvider,
-                       checkpointResponder,
-                       libraryCache,
-                       fileCache,
-                       taskManagerConfiguration,
-                       taskMetricGroup,
-                       resultPartitionConsumableNotifier,
-                       partitionStateChecker,
-                       getRpcService().getExecutor());
+                       TaskManagerActions taskManagerActions = 
jobManagerConnection.getTaskManagerActions();
+                       CheckpointResponder checkpointResponder = 
jobManagerConnection.getCheckpointResponder();
+                       LibraryCacheManager libraryCache = 
jobManagerConnection.getLibraryCacheManager();
+                       ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = 
jobManagerConnection.getResultPartitionConsumableNotifier();
+                       PartitionProducerStateChecker partitionStateChecker = 
jobManagerConnection.getPartitionStateChecker();
 
-               log.info("Received task {}.", 
task.getTaskInfo().getTaskNameWithSubtasks());
-
-               boolean taskAdded;
+                       Task task = new Task(
+                               jobInformation,
+                               taskInformation,
+                               tdd.getExecutionAttemptId(),
+                               tdd.getAllocationId(),
+                               tdd.getSubtaskIndex(),
+                               tdd.getAttemptNumber(),
+                               tdd.getProducedPartitions(),
+                               tdd.getInputGates(),
+                               tdd.getTargetSlotNumber(),
+                               tdd.getTaskStateHandles(),
+                               memoryManager,
+                               ioManager,
+                               networkEnvironment,
+                               broadcastVariableManager,
+                               taskManagerActions,
+                               inputSplitProvider,
+                               checkpointResponder,
+                               libraryCache,
+                               fileCache,
+                               taskManagerConfiguration,
+                               taskMetricGroup,
+                               resultPartitionConsumableNotifier,
+                               partitionStateChecker,
+                               getRpcService().getExecutor());
+
+                       log.info("Received task {}.", 
task.getTaskInfo().getTaskNameWithSubtasks());
+
+                       boolean taskAdded;
 
-               try {
-                       taskAdded = taskSlotTable.addTask(task);
-               } catch (SlotNotFoundException | SlotNotActiveException e) {
-                       throw new TaskSubmissionException("Could not submit 
task.", e);
-               }
+                       try {
+                               taskAdded = taskSlotTable.addTask(task);
+                       } catch (SlotNotFoundException | SlotNotActiveException 
e) {
+                               throw new TaskSubmissionException("Could not 
submit task.", e);
+                       }
 
-               if (taskAdded) {
-                       task.startTaskThread();
+                       if (taskAdded) {
+                               task.startTaskThread();
 
-                       return Acknowledge.get();
-               } else {
-                       final String message = "TaskManager already contains a 
task for id " +
-                               task.getExecutionId() + '.';
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
+                       } else {
+                               final String message = "TaskManager already 
contains a task for id " +
+                                       task.getExecutionId() + '.';
 
-                       log.debug(message);
-                       throw new TaskSubmissionException(message);
+                               log.debug(message);
+                               throw new TaskSubmissionException(message);
+                       }
+               } catch (TaskSubmissionException e) {
+                       return FutureUtils.completedExceptionally(e);
                }
        }
 
-       @RpcMethod
-       public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) 
throws TaskException {
+       @Override
+       public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
                final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
                        try {
                                task.cancelExecution();
-                               return Acknowledge.get();
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
                        } catch (Throwable t) {
-                               throw new TaskException("Cannot cancel task for 
execution " + executionAttemptID + '.', t);
+                               return FutureUtils.completedExceptionally(
+                                       new TaskException("Cannot cancel task 
for execution " + executionAttemptID + '.', t));
                        }
                } else {
                        final String message = "Cannot find task to stop for 
execution " + executionAttemptID + '.';
 
                        log.debug(message);
-                       throw new TaskException(message);
+                       return FutureUtils.completedExceptionally(new 
TaskException(message));
                }
        }
 
-       @RpcMethod
-       public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) 
throws TaskException {
+       @Override
+       public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
                final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
                        try {
                                task.stopExecution();
-                               return Acknowledge.get();
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
                        } catch (Throwable t) {
-                               throw new TaskException("Cannot stop task for 
execution " + executionAttemptID + '.', t);
+                               return FutureUtils.completedExceptionally(new 
TaskException("Cannot stop task for execution " + executionAttemptID + '.', t));
                        }
                } else {
                        final String message = "Cannot find task to stop for 
execution " + executionAttemptID + '.';
 
                        log.debug(message);
-                       throw new TaskException(message);
+                       return FutureUtils.completedExceptionally(new 
TaskException(message));
                }
        }
 
@@ -443,8 +451,11 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        // Partition lifecycle RPCs
        // 
----------------------------------------------------------------------
 
-       @RpcMethod
-       public Acknowledge updatePartitions(final ExecutionAttemptID 
executionAttemptID, Iterable<PartitionInfo> partitionInfos) throws 
PartitionException {
+       @Override
+       public CompletableFuture<Acknowledge> updatePartitions(
+                       final ExecutionAttemptID executionAttemptID,
+                       Iterable<PartitionInfo> partitionInfos,
+                       Time timeout) {
                final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
@@ -455,9 +466,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
                                if (singleInputGate != null) {
                                        // Run asynchronously because it might 
be blocking
-                                       getRpcService().execute(new Runnable() {
-                                               @Override
-                                               public void run() {
+                                       getRpcService().execute(
+                                               () -> {
                                                        try {
                                                                
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
                                                        } catch (IOException | 
InterruptedException e) {
@@ -470,23 +480,22 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                                                        
log.error("Failed canceling task with execution ID {} after task update 
failure.", executionAttemptID, re);
                                                                }
                                                        }
-                                               }
-                                       });
+                                               });
                                } else {
-                                       throw new PartitionException("No reader 
with ID " +
-                                               intermediateResultPartitionID + 
" for task " + executionAttemptID +
-                                               " was found.");
+                                       return 
FutureUtils.completedExceptionally(
+                                               new PartitionException("No 
reader with ID " + intermediateResultPartitionID +
+                                                       " for task " + 
executionAttemptID + " was found."));
                                }
                        }
 
-                       return Acknowledge.get();
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
                } else {
                        log.debug("Discard update for input partitions of task 
{}. Task is no longer running.", executionAttemptID);
-                       return Acknowledge.get();
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
                }
        }
 
-       @RpcMethod
+       @Override
        public void failPartition(ExecutionAttemptID executionAttemptID) {
                log.info("Discarding the results produced by task execution 
{}.", executionAttemptID);
 
@@ -504,12 +513,12 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        // Heartbeat RPC
        // 
----------------------------------------------------------------------
 
-       @RpcMethod
+       @Override
        public void heartbeatFromJobManager(ResourceID resourceID) {
                jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
        }
 
-       @RpcMethod
+       @Override
        public void heartbeatFromResourceManager(ResourceID resourceID) {
                resourceManagerHeartbeatManager.requestHeartbeat(resourceID, 
null);
        }
@@ -518,8 +527,12 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        // Checkpointing RPCs
        // 
----------------------------------------------------------------------
 
-       @RpcMethod
-       public Acknowledge triggerCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp, 
CheckpointOptions checkpointOptions) throws CheckpointException {
+       @Override
+       public CompletableFuture<Acknowledge> triggerCheckpoint(
+                       ExecutionAttemptID executionAttemptID,
+                       long checkpointId,
+                       long checkpointTimestamp,
+                       CheckpointOptions checkpointOptions) {
                log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, 
checkpointTimestamp, executionAttemptID);
 
                final Task task = taskSlotTable.getTask(executionAttemptID);
@@ -527,17 +540,20 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                if (task != null) {
                        task.triggerCheckpointBarrier(checkpointId, 
checkpointTimestamp, checkpointOptions);
 
-                       return Acknowledge.get();
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
                } else {
                        final String message = "TaskManager received a 
checkpoint request for unknown task " + executionAttemptID + '.';
 
                        log.debug(message);
-                       throw new CheckpointException(message);
+                       return FutureUtils.completedExceptionally(new 
CheckpointException(message));
                }
        }
 
-       @RpcMethod
-       public Acknowledge confirmCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp) throws 
CheckpointException {
+       @Override
+       public CompletableFuture<Acknowledge> confirmCheckpoint(
+                       ExecutionAttemptID executionAttemptID,
+                       long checkpointId,
+                       long checkpointTimestamp) {
                log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, 
checkpointTimestamp, executionAttemptID);
 
                final Task task = taskSlotTable.getTask(executionAttemptID);
@@ -545,12 +561,12 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                if (task != null) {
                        task.notifyCheckpointComplete(checkpointId);
 
-                       return Acknowledge.get();
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
                } else {
                        final String message = "TaskManager received a 
checkpoint confirmation for unknown task " + executionAttemptID + '.';
 
                        log.debug(message);
-                       throw new CheckpointException(message);
+                       return FutureUtils.completedExceptionally(new 
CheckpointException(message));
                }
        }
 
@@ -569,85 +585,90 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
         * @throws SlotAllocationException if the slot allocation fails
         * @return answer to the slot request
         */
-       @RpcMethod
-       public Acknowledge requestSlot(
+       @Override
+       public CompletableFuture<Acknowledge> requestSlot(
                final SlotID slotId,
                final JobID jobId,
                final AllocationID allocationId,
                final String targetAddress,
-               final UUID rmLeaderId) throws SlotAllocationException {
+               final UUID rmLeaderId,
+               final Time timeout) {
                // TODO: Filter invalid requests from the resource manager by 
using the instance/registration Id
 
                log.info("Receive slot request {} for job {} from resource 
manager with leader id {}.",
                        allocationId, jobId, rmLeaderId);
 
-               if (resourceManagerConnection == null) {
-                       final String message = "TaskManager is not connected to 
a resource manager.";
-                       log.debug(message);
-                       throw new SlotAllocationException(message);
-               }
-
-               if 
(!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) {
-                       final String message = "The leader id " + rmLeaderId +
-                               " does not match with the leader id of the 
connected resource manager " +
-                               resourceManagerConnection.getTargetLeaderId() + 
'.';
+               try {
+                       if (resourceManagerConnection == null) {
+                               final String message = "TaskManager is not 
connected to a resource manager.";
+                               log.debug(message);
+                               throw new SlotAllocationException(message);
+                       }
 
-                       log.debug(message);
-                       throw new SlotAllocationException(message);
-               }
+                       if 
(!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) {
+                               final String message = "The leader id " + 
rmLeaderId +
+                                       " does not match with the leader id of 
the connected resource manager " +
+                                       
resourceManagerConnection.getTargetLeaderId() + '.';
 
-               if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
-                       if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), 
jobId, allocationId, taskManagerConfiguration.getTimeout())) {
-                               log.info("Allocated slot for {}.", 
allocationId);
-                       } else {
-                               log.info("Could not allocate slot for {}.", 
allocationId);
-                               throw new SlotAllocationException("Could not 
allocate slot.");
+                               log.debug(message);
+                               throw new SlotAllocationException(message);
                        }
-               } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), 
jobId, allocationId)) {
-                       final String message = "The slot " + slotId + " has 
already been allocated for a different job.";
 
-                       log.info(message);
+                       if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
+                               if 
(taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, 
taskManagerConfiguration.getTimeout())) {
+                                       log.info("Allocated slot for {}.", 
allocationId);
+                               } else {
+                                       log.info("Could not allocate slot for 
{}.", allocationId);
+                                       throw new 
SlotAllocationException("Could not allocate slot.");
+                               }
+                       } else if 
(!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
+                               final String message = "The slot " + slotId + " 
has already been allocated for a different job.";
 
-                       throw new SlotOccupiedException(message, 
taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
-               }
+                               log.info(message);
 
-               if (jobManagerTable.contains(jobId)) {
-                       offerSlotsToJobManager(jobId);
-               } else {
-                       try {
-                               jobLeaderService.addJob(jobId, targetAddress);
-                       } catch (Exception e) {
-                               // free the allocated slot
+                               throw new SlotOccupiedException(message, 
taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
+                       }
+
+                       if (jobManagerTable.contains(jobId)) {
+                               offerSlotsToJobManager(jobId);
+                       } else {
                                try {
-                                       taskSlotTable.freeSlot(allocationId);
-                               } catch (SlotNotFoundException 
slotNotFoundException) {
-                                       // slot no longer existent, this should 
actually never happen, because we've
-                                       // just allocated the slot. So let's 
fail hard in this case!
-                                       onFatalError(slotNotFoundException);
-                               }
+                                       jobLeaderService.addJob(jobId, 
targetAddress);
+                               } catch (Exception e) {
+                                       // free the allocated slot
+                                       try {
+                                               
taskSlotTable.freeSlot(allocationId);
+                                       } catch (SlotNotFoundException 
slotNotFoundException) {
+                                               // slot no longer existent, 
this should actually never happen, because we've
+                                               // just allocated the slot. So 
let's fail hard in this case!
+                                               
onFatalError(slotNotFoundException);
+                                       }
 
-                               // sanity check
-                               if 
(!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
-                                       onFatalError(new Exception("Could not 
free slot " + slotId));
-                               }
+                                       // sanity check
+                                       if 
(!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
+                                               onFatalError(new 
Exception("Could not free slot " + slotId));
+                                       }
 
-                               throw new SlotAllocationException("Could not 
add job to job leader service.", e);
+                                       throw new 
SlotAllocationException("Could not add job to job leader service.", e);
+                               }
                        }
+               } catch (SlotAllocationException slotAllocationException) {
+                       return 
FutureUtils.completedExceptionally(slotAllocationException);
                }
 
-               return Acknowledge.get();
+               return CompletableFuture.completedFuture(Acknowledge.get());
        }
 
        // 
----------------------------------------------------------------------
        // Disconnection RPCs
        // 
----------------------------------------------------------------------
 
-       @RpcMethod
+       @Override
        public void disconnectJobManager(JobID jobId, Exception cause) {
                closeJobManagerConnection(jobId, cause);
        }
 
-       @RpcMethod
+       @Override
        public void disconnectResourceManager(Exception cause) {
                closeResourceManagerConnection(cause);
        }
@@ -767,7 +788,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                        reservedSlots.add(offer);
                                }
 
-                               CompletableFuture<Iterable<SlotOffer>> 
acceptedSlotsFuture = jobMasterGateway.offerSlots(
+                               CompletableFuture<Collection<SlotOffer>> 
acceptedSlotsFuture = jobMasterGateway.offerSlots(
                                        getResourceID(),
                                        reservedSlots,
                                        leaderId,

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index e1144c9..3ca0327 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -528,7 +528,8 @@ public class ResourceManagerTest extends TestLogger {
                                rmLeaderSessionId,
                                taskManagerAddress,
                                taskManagerResourceID,
-                               slotReport);
+                               slotReport,
+                               Time.milliseconds(0L));
                        RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
                        assertTrue(response instanceof 
TaskExecutorRegistrationSuccess);
 
@@ -627,7 +628,8 @@ public class ResourceManagerTest extends TestLogger {
                                jmLeaderId,
                                jmResourceId,
                                jobMasterAddress,
-                               jobId);
+                               jobId,
+                               Time.milliseconds(0L));
                        RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
                        assertTrue(response instanceof 
JobMasterRegistrationSuccess);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 267f10b..3814684 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -85,7 +85,7 @@ public class DispatcherTest extends TestLogger {
 
                        dispatcher.start();
 
-                       DispatcherGateway dispatcherGateway = 
dispatcher.getSelf();
+                       DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
                        CompletableFuture<Acknowledge> acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 4cc4f11..8d613ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -82,7 +82,7 @@ public class SlotPoolRpcTest {
                );
                pool.start(UUID.randomUUID(), "foobar");
 
-               CompletableFuture<SimpleSlot> future = 
pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+               CompletableFuture<SimpleSlot> future = 
pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, 
Time.days(1));
 
                try {
                        future.get(4, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 3e2293b..aeceb59 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -101,7 +101,7 @@ public class SlotPoolTest extends TestLogger {
                slotPool.registerTaskManager(resourceID);
 
                ScheduledUnit task = mock(ScheduledUnit.class);
-               CompletableFuture<SimpleSlot> future = 
slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
+               CompletableFuture<SimpleSlot> future = 
slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null, 
Time.milliseconds(0L));
                assertFalse(future.isDone());
 
                ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
@@ -110,7 +110,7 @@ public class SlotPoolTest extends TestLogger {
                final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
                AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, 
slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-               assertTrue(slotPool.offerSlot(allocatedSlot));
+               assertTrue(slotPool.offerSlot(allocatedSlot).get());
 
                SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
                assertTrue(future.isDone());
@@ -126,8 +126,8 @@ public class SlotPoolTest extends TestLogger {
                ResourceID resourceID = new ResourceID("resource");
                slotPool.registerTaskManager(resourceID);
 
-               CompletableFuture<SimpleSlot> future1 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
-               CompletableFuture<SimpleSlot> future2 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+               CompletableFuture<SimpleSlot> future1 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, 
Time.milliseconds(0L));
+               CompletableFuture<SimpleSlot> future2 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, 
Time.milliseconds(0L));
 
                assertFalse(future1.isDone());
                assertFalse(future2.isDone());
@@ -139,7 +139,7 @@ public class SlotPoolTest extends TestLogger {
                final List<SlotRequest> slotRequests = 
slotRequestArgumentCaptor.getAllValues();
 
                AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, 
slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-               assertTrue(slotPool.offerSlot(allocatedSlot));
+               assertTrue(slotPool.offerSlot(allocatedSlot).get());
 
                SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
                assertTrue(future1.isDone());
@@ -165,7 +165,7 @@ public class SlotPoolTest extends TestLogger {
                ResourceID resourceID = new ResourceID("resource");
                slotPool.registerTaskManager(resourceID);
 
-               CompletableFuture<SimpleSlot> future1 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+               CompletableFuture<SimpleSlot> future1 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, 
Time.milliseconds(0L));
                assertFalse(future1.isDone());
 
                ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
@@ -174,7 +174,7 @@ public class SlotPoolTest extends TestLogger {
                final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
                AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, 
slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-               assertTrue(slotPool.offerSlot(allocatedSlot));
+               assertTrue(slotPool.offerSlot(allocatedSlot).get());
 
                SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
                assertTrue(future1.isDone());
@@ -182,7 +182,7 @@ public class SlotPoolTest extends TestLogger {
                // return this slot to pool
                slot1.releaseSlot();
 
-               CompletableFuture<SimpleSlot> future2 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+               CompletableFuture<SimpleSlot> future2 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, 
Time.milliseconds(0L));
 
                // second allocation fulfilled by previous slot returning
                SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
@@ -200,7 +200,7 @@ public class SlotPoolTest extends TestLogger {
                ResourceID resourceID = new ResourceID("resource");
                slotPool.registerTaskManager(resourceID);
 
-               CompletableFuture<SimpleSlot> future = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+               CompletableFuture<SimpleSlot> future = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, 
Time.milliseconds(0L));
                assertFalse(future.isDone());
 
                ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
@@ -210,30 +210,30 @@ public class SlotPoolTest extends TestLogger {
 
                // slot from unregistered resource
                AllocatedSlot invalid = createAllocatedSlot(new 
ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, 
DEFAULT_TESTING_PROFILE);
-               assertFalse(slotPool.offerSlot(invalid));
+               assertFalse(slotPool.offerSlot(invalid).get());
 
                AllocatedSlot notRequested = createAllocatedSlot(resourceID, 
new AllocationID(), jobId, DEFAULT_TESTING_PROFILE);
 
                // we'll also accept non requested slots
-               assertTrue(slotPool.offerSlot(notRequested));
+               assertTrue(slotPool.offerSlot(notRequested).get());
 
                AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, 
slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
 
                // accepted slot
-               assertTrue(slotPool.offerSlot(allocatedSlot));
+               assertTrue(slotPool.offerSlot(allocatedSlot).get());
                SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
                assertTrue(future.isDone());
                assertTrue(slot.isAlive());
 
                // duplicated offer with using slot
-               assertTrue(slotPool.offerSlot(allocatedSlot));
+               assertTrue(slotPool.offerSlot(allocatedSlot).get());
                assertTrue(future.isDone());
                assertTrue(slot.isAlive());
 
                // duplicated offer with free slot
                slot.releaseSlot();
                assertTrue(slot.isReleased());
-               assertTrue(slotPool.offerSlot(allocatedSlot));
+               assertTrue(slotPool.offerSlot(allocatedSlot).get());
        }
 
        @Test
@@ -241,17 +241,17 @@ public class SlotPoolTest extends TestLogger {
                ResourceID resourceID = new ResourceID("resource");
                slotPool.registerTaskManager(resourceID);
 
-               CompletableFuture<SimpleSlot> future1 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+               CompletableFuture<SimpleSlot> future1 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, 
Time.milliseconds(0L));
 
                ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
                verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
 
                final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
-               CompletableFuture<SimpleSlot> future2 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+               CompletableFuture<SimpleSlot> future2 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, 
Time.milliseconds(0L));
 
                AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, 
slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-               assertTrue(slotPool.offerSlot(allocatedSlot));
+               assertTrue(slotPool.offerSlot(allocatedSlot).get());
 
                SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
                assertTrue(future1.isDone());

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 7c58879..435c23d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -52,6 +52,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -83,7 +84,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 
                jobManager = mock(JobMaster.class);
                jobManagerGateway = mock(JobMasterGateway.class);
-               when(jobManager.getSelf()).thenReturn(jobManagerGateway);
+               
when(jobManager.getSelfGateway(eq(JobMasterGateway.class))).thenReturn(jobManagerGateway);
                when(jobManager.getRpcService()).thenReturn(mockRpc);
 
                
PowerMockito.whenNew(JobMaster.class).withAnyArguments().thenReturn(jobManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 48a1d45..0c4d376 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -112,7 +112,7 @@ public class JobMasterTest extends TestLogger {
                        jobMaster.start(jmLeaderId);
 
                        // register task manager will trigger monitor heartbeat 
target, schedule heartbeat request at interval time
-                       jobMaster.registerTaskManager(taskManagerAddress, 
taskManagerLocation, jmLeaderId);
+                       jobMaster.registerTaskManager(taskManagerAddress, 
taskManagerLocation, jmLeaderId, Time.milliseconds(0L));
 
                        ArgumentCaptor<Runnable> heartbeatRunnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
                        verify(scheduledExecutor, times(1)).scheduleAtFixedRate(

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 6480d75..10d6a72 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -52,6 +52,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 
        private TestingSerialRpcService rpcService;
 
+       private final Time timeout = Time.milliseconds(0L);
+
        @Before
        public void setup() throws Exception {
                rpcService = new TestingSerialRpcService();
@@ -83,7 +85,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
                        jmLeaderID,
                        jmResourceId,
                        jobMasterAddress,
-                       jobID);
+                       jobID,
+                       timeout);
                RegistrationResponse response = successfulFuture.get(5L, 
TimeUnit.SECONDS);
                assertTrue(response instanceof JobMasterRegistrationSuccess);
 
@@ -114,7 +117,8 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                        jmLeaderID,
                        jmResourceId,
                        jobMasterAddress,
-                       jobID);
+                       jobID,
+                       timeout);
                assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
 
                if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -146,7 +150,8 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                        differentLeaderSessionID,
                        jmResourceId,
                        jobMasterAddress,
-                       jobID);
+                       jobID,
+                       timeout);
                assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
 
                if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -178,7 +183,8 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                        jmLeaderSessionId,
                        jmResourceId,
                        invalidAddress,
-                       jobID);
+                       jobID,
+                       timeout);
                assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
 
                if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -210,7 +216,8 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                        jmLeaderSessionId,
                        jmResourceId,
                        jobMasterAddress,
-                       unknownJobIDToHAServices);
+                       unknownJobIDToHAServices,
+                       timeout);
                RegistrationResponse response = declineFuture.get(5, 
TimeUnit.SECONDS);
                assertTrue(response instanceof RegistrationResponse.Decline);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 4127cea..fc96f0d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -90,13 +90,13 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                try {
                        // test response successful
                        CompletableFuture<RegistrationResponse> 
successfulFuture =
-                               
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID, slotReport);
+                               
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID, slotReport, Time.milliseconds(0L));
                        RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
                        assertTrue(response instanceof 
TaskExecutorRegistrationSuccess);
 
                        // test response successful with instanceID not equal 
to previous when receive duplicate registration from taskExecutor
                        CompletableFuture<RegistrationResponse> duplicateFuture 
=
-                               
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID, slotReport);
+                               
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID, slotReport, Time.milliseconds(0L));
                        RegistrationResponse duplicateResponse = 
duplicateFuture.get();
                        assertTrue(duplicateResponse instanceof 
TaskExecutorRegistrationSuccess);
                        assertNotEquals(((TaskExecutorRegistrationSuccess) 
response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) 
duplicateResponse).getRegistrationId());
@@ -116,7 +116,7 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                        // test throw exception when receive a registration 
from taskExecutor which takes unmatched leaderSessionId
                        UUID differentLeaderSessionID = UUID.randomUUID();
                        CompletableFuture<RegistrationResponse> 
unMatchedLeaderFuture =
-                               
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID, slotReport);
+                               
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L));
                        assertTrue(unMatchedLeaderFuture.get(5, 
TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
                } finally {
                        if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -134,7 +134,7 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                        // test throw exception when receive a registration 
from taskExecutor which takes invalid address
                        String invalidAddress = "/taskExecutor2";
                        CompletableFuture<RegistrationResponse> 
invalidAddressFuture =
-                               
resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, 
taskExecutorResourceID, slotReport);
+                               
resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, 
taskExecutorResourceID, slotReport, Time.milliseconds(0L));
                        assertTrue(invalidAddressFuture.get(5, 
TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
                } finally {
                        if (testingFatalErrorHandler.hasExceptionOccurred()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 4be5257..00762b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -67,7 +67,7 @@ public class AsyncCallsTest extends TestLogger {
 
                TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, 
lock);
                testEndpoint.start();
-               TestGateway gateway = testEndpoint.getSelf();
+               TestGateway gateway = 
testEndpoint.getSelfGateway(TestGateway.class);
 
                // a bunch of gateway calls
                gateway.someCall();
@@ -108,7 +108,7 @@ public class AsyncCallsTest extends TestLogger {
                assertFalse("Rpc Endpoint had concurrent access", 
testEndpoint.hasConcurrentAccess());
                assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
 
-               akkaRpcService.stopServer(testEndpoint.getSelf());
+               testEndpoint.shutDown();
        }
 
        @Test
@@ -174,7 +174,7 @@ public class AsyncCallsTest extends TestLogger {
        }
 
        @SuppressWarnings("unused")
-       public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+       public static class TestEndpoint extends RpcEndpoint implements 
TestGateway {
 
                private final ReentrantLock lock;
 
@@ -185,7 +185,7 @@ public class AsyncCallsTest extends TestLogger {
                        this.lock = lock;
                }
 
-               @RpcMethod
+               @Override
                public void someCall() {
                        boolean holdsLock = lock.tryLock();
                        if (holdsLock) {
@@ -195,7 +195,7 @@ public class AsyncCallsTest extends TestLogger {
                        }
                }
 
-               @RpcMethod
+               @Override
                public void anotherCall() {
                        boolean holdsLock = lock.tryLock();
                        if (holdsLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
deleted file mode 100644
index 07dadae..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.ReflectionUtil;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-import org.reflections.Reflections;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Test which ensures that all classes of subtype {@link RpcEndpoint} implement
- * the methods specified in the generic gateway type argument.
- *
- * {@code
- *         RpcEndpoint<GatewayTypeParameter extends RpcGateway>
- * }
- *
- * Note, that the class hierarchy can also be nested. In this case the type 
argument
- * always has to be the first argument, e.g. {@code
- *
- *         // RpcClass needs to implement RpcGatewayClass' methods
- *         RpcClass extends RpcEndpoint<RpcGatewayClass>
- *
- *         // RpcClass2 or its subclass needs to implement RpcGatewayClass' 
methods
- *      RpcClass<GatewayTypeParameter extends RpcGateway,...> extends 
RpcEndpoint<GatewayTypeParameter>
- *      RpcClass2 extends RpcClass<RpcGatewayClass,...>
- *
- *      // needless to say, this can even be nested further
- *      ...
- * }
- *
- */
-public class RpcCompletenessTest extends TestLogger {
-
-       private static Logger LOG = 
LoggerFactory.getLogger(RpcCompletenessTest.class);
-
-       private static final Class<?> futureClass = CompletableFuture.class;
-       private static final Class<?> timeoutClass = Time.class;
-
-       @Test
-       @SuppressWarnings({"rawtypes", "unchecked"})
-       public void testRpcCompleteness() {
-               Reflections reflections = new Reflections("org.apache.flink");
-
-               Set<Class<? extends RpcEndpoint>> classes = 
reflections.getSubTypesOf(RpcEndpoint.class);
-
-               Class<? extends RpcEndpoint> c;
-
-               mainloop:
-               for (Class<? extends RpcEndpoint> rpcEndpoint : classes) {
-                       c = rpcEndpoint;
-
-                       LOG.debug("-------------");
-                       LOG.debug("c: {}", c);
-
-                       // skip abstract classes
-                       if (Modifier.isAbstract(c.getModifiers())) {
-                               LOG.debug("Skipping abstract class");
-                               continue;
-                       }
-
-                       // check for type parameter bound to RpcGateway
-                       // skip if one is found because a subclass will provide 
the concrete argument
-                       TypeVariable<? extends Class<? extends RpcEndpoint>>[] 
typeParameters = c.getTypeParameters();
-                       LOG.debug("Checking {} parameters.", 
typeParameters.length);
-                       for (int i = 0; i < typeParameters.length; i++) {
-                               for (Type bound : 
typeParameters[i].getBounds()) {
-                                       LOG.debug("checking bound {} of type 
parameter {}", bound, typeParameters[i]);
-                                       if (bound.toString().equals("interface 
" + RpcGateway.class.getName())) {
-                                               if (i > 0) {
-                                                       fail("Type parameter 
for RpcGateway should come first in " + c);
-                                               }
-                                               LOG.debug("Skipping class with 
type parameter bound to RpcGateway.");
-                                               // Type parameter is bound to 
RpcGateway which a subclass will provide
-                                               continue mainloop;
-                                       }
-                               }
-                       }
-
-                       // check if this class or any super class contains the 
RpcGateway argument
-                       Class<?> rpcGatewayType;
-                       do {
-                               LOG.debug("checking type argument of class: 
{}", c);
-                               rpcGatewayType = 
ReflectionUtil.getTemplateType1(c);
-                               LOG.debug("type argument is: {}", 
rpcGatewayType);
-
-                               c = (Class<? extends RpcEndpoint>) 
c.getSuperclass();
-
-                       } while 
(!RpcGateway.class.isAssignableFrom(rpcGatewayType));
-
-                       LOG.debug("Checking RRC completeness of endpoint '{}' 
with gateway '{}'",
-                               rpcEndpoint.getSimpleName(), 
rpcGatewayType.getSimpleName());
-
-                       checkCompleteness(rpcEndpoint, (Class<? extends 
RpcGateway>) rpcGatewayType);
-               }
-       }
-
-       @SuppressWarnings("rawtypes")
-       private void checkCompleteness(Class<? extends RpcEndpoint> 
rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
-               List<Method> rpcMethodsFromGateway = 
getRpcMethodsFromGateway(rpcGateway);
-               Method[] gatewayMethods = rpcMethodsFromGateway.toArray(new 
Method[rpcMethodsFromGateway.size()]);
-               Method[] serverMethods = rpcEndpoint.getMethods();
-
-               Map<String, Set<Method>> rpcMethods = new HashMap<>();
-               Set<Method> unmatchedRpcMethods = new HashSet<>();
-
-               for (Method serverMethod : serverMethods) {
-                       if (serverMethod.isAnnotationPresent(RpcMethod.class)) {
-                               if 
(rpcMethods.containsKey(serverMethod.getName())) {
-                                       Set<Method> methods = 
rpcMethods.get(serverMethod.getName());
-                                       methods.add(serverMethod);
-
-                                       rpcMethods.put(serverMethod.getName(), 
methods);
-                               } else {
-                                       Set<Method> methods = new HashSet<>();
-                                       methods.add(serverMethod);
-
-                                       rpcMethods.put(serverMethod.getName(), 
methods);
-                               }
-
-                               unmatchedRpcMethods.add(serverMethod);
-                       }
-               }
-
-               for (Method gatewayMethod : gatewayMethods) {
-                       assertTrue(
-                               "The rpc endpoint " + rpcEndpoint.getName() + " 
does not contain a RpcMethod " +
-                                       "annotated method with the same name 
and signature " +
-                                       
generateEndpointMethodSignature(gatewayMethod) + ".",
-                               
rpcMethods.containsKey(gatewayMethod.getName()));
-
-                       checkGatewayMethod(gatewayMethod);
-
-                       if (!matchGatewayMethodWithEndpoint(gatewayMethod, 
rpcMethods.get(gatewayMethod.getName()), unmatchedRpcMethods)) {
-                               fail("Could not find a RpcMethod annotated 
method in rpc endpoint " +
-                                       rpcEndpoint.getName() + " matching the 
rpc gateway method " +
-                                       
generateEndpointMethodSignature(gatewayMethod) + " defined in the rpc gateway " 
+
-                                       rpcGateway.getName() + ".");
-                       }
-               }
-
-               if (!unmatchedRpcMethods.isEmpty()) {
-                       StringBuilder builder = new StringBuilder();
-
-                       for (Method unmatchedRpcMethod : unmatchedRpcMethods) {
-                               builder.append(unmatchedRpcMethod).append("\n");
-                       }
-
-                       fail("The rpc endpoint " + rpcEndpoint.getName() + " 
contains rpc methods which " +
-                               "are not matched to gateway methods of " + 
rpcGateway.getName() + ":\n" +
-                               builder.toString());
-               }
-       }
-
-       /**
-        * Checks whether the gateway method fulfills the gateway method 
requirements.
-        * <ul>
-        *     <li>It checks whether the return type is void or a {@link 
CompletableFuture} wrapping the actual result. </li>
-        *     <li>It checks that the method's parameter list contains at most 
one parameter annotated with {@link RpcTimeout}.</li>
-        * </ul>
-        *
-        * @param gatewayMethod Gateway method to check
-        */
-       private void checkGatewayMethod(Method gatewayMethod) {
-               if (!gatewayMethod.getReturnType().equals(Void.TYPE)) {
-                       assertTrue(
-                               "The return type of method " + 
gatewayMethod.getName() + " in the rpc gateway " +
-                                       
gatewayMethod.getDeclaringClass().getName() + " is non void and not a " +
-                                       "future. Non-void return types have to 
be returned as a future.",
-                               
gatewayMethod.getReturnType().equals(futureClass));
-               }
-
-               Annotation[][] parameterAnnotations = 
gatewayMethod.getParameterAnnotations();
-               Class<?>[] parameterTypes = gatewayMethod.getParameterTypes();
-               int rpcTimeoutParameters = 0;
-
-               for (int i = 0; i < parameterAnnotations.length; i++) {
-                       if 
(RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
-                               assertTrue(
-                                       "The rpc timeout has to be of type " + 
timeoutClass.getName() + ".",
-                                       parameterTypes[i].equals(timeoutClass));
-
-                               rpcTimeoutParameters++;
-                       }
-               }
-
-               assertTrue("The gateway method " + gatewayMethod + " must have 
at most one RpcTimeout " +
-                       "annotated parameter.", rpcTimeoutParameters <= 1);
-       }
-
-       /**
-        * Checks whether we find a matching overloaded version for the gateway 
method among the methods
-        * with the same name in the rpc endpoint.
-        *
-        * @param gatewayMethod Gateway method
-        * @param endpointMethods Set of rpc methods on the rpc endpoint with 
the same name as the gateway
-        *                   method
-        * @param unmatchedRpcMethods Set of unmatched rpc methods on the 
endpoint side (so far)
-        */
-       private boolean matchGatewayMethodWithEndpoint(Method gatewayMethod, 
Set<Method> endpointMethods, Set<Method> unmatchedRpcMethods) {
-               for (Method endpointMethod : endpointMethods) {
-                       if (checkMethod(gatewayMethod, endpointMethod)) {
-                               unmatchedRpcMethods.remove(endpointMethod);
-                               return true;
-                       }
-               }
-
-               return false;
-       }
-
-       private boolean checkMethod(Method gatewayMethod, Method 
endpointMethod) {
-               Class<?>[] gatewayParameterTypes = 
gatewayMethod.getParameterTypes();
-               Annotation[][] gatewayParameterAnnotations = 
gatewayMethod.getParameterAnnotations();
-
-               Class<?>[] endpointParameterTypes = 
endpointMethod.getParameterTypes();
-
-               List<Class<?>> filteredGatewayParameterTypes = new 
ArrayList<>();
-
-               assertEquals(gatewayParameterTypes.length, 
gatewayParameterAnnotations.length);
-
-               // filter out the RpcTimeout parameters
-               for (int i = 0; i < gatewayParameterTypes.length; i++) {
-                       if 
(!RpcCompletenessTest.isRpcTimeout(gatewayParameterAnnotations[i])) {
-                               
filteredGatewayParameterTypes.add(gatewayParameterTypes[i]);
-                       }
-               }
-
-               if (filteredGatewayParameterTypes.size() != 
endpointParameterTypes.length) {
-                       return false;
-               } else {
-                       // check the parameter types
-                       for (int i = 0; i < 
filteredGatewayParameterTypes.size(); i++) {
-                               if 
(!checkType(filteredGatewayParameterTypes.get(i), endpointParameterTypes[i])) {
-                                       return false;
-                               }
-                       }
-
-                       // check the return types
-                       if (endpointMethod.getReturnType() == void.class) {
-                               if (gatewayMethod.getReturnType() != 
void.class) {
-                                       return false;
-                               }
-                       } else {
-                               // has return value. The gateway method should 
be wrapped in a future
-                               Class<?> futureClass = 
gatewayMethod.getReturnType();
-
-                               // sanity check that the return type of a 
gateway method must be void or a future
-                               if 
(!futureClass.equals(RpcCompletenessTest.futureClass)) {
-                                       return false;
-                               } else {
-                                       ReflectionUtil.FullTypeInfo 
fullValueTypeInfo = 
ReflectionUtil.getFullTemplateType(gatewayMethod.getGenericReturnType(), 0);
-
-                                       if 
(endpointMethod.getReturnType().equals(futureClass)) {
-                                               ReflectionUtil.FullTypeInfo 
fullRpcEndpointValueTypeInfo = 
ReflectionUtil.getFullTemplateType(endpointMethod.getGenericReturnType(), 0);
-
-                                               // check if we have the same 
future value types
-                                               if (fullValueTypeInfo != null 
&& fullRpcEndpointValueTypeInfo != null) {
-                                                       Iterator<Class<?>> 
valueClasses = fullValueTypeInfo.getClazzIterator();
-                                                       Iterator<Class<?>> 
rpcClasses = fullRpcEndpointValueTypeInfo.getClazzIterator();
-
-                                                       while 
(valueClasses.hasNext() && rpcClasses.hasNext()) {
-                                                               if 
(!checkType(valueClasses.next(), rpcClasses.next())) {
-                                                                       return 
false;
-                                                               }
-                                                       }
-
-                                                       // both should be empty
-                                                       return 
!valueClasses.hasNext() && !rpcClasses.hasNext();
-                                               }
-                                       } else {
-                                               if (fullValueTypeInfo != null 
&& !checkType(fullValueTypeInfo.getClazz(), endpointMethod.getReturnType())) {
-                                                       return false;
-                                               }
-                                       }
-                               }
-                       }
-
-                       return 
gatewayMethod.getName().equals(endpointMethod.getName());
-               }
-       }
-
-       private boolean checkType(Class<?> firstType, Class<?> secondType) {
-               Class<?> firstResolvedType;
-               Class<?> secondResolvedType;
-
-               if (firstType.isPrimitive()) {
-                       firstResolvedType = 
RpcCompletenessTest.resolvePrimitiveType(firstType);
-               } else {
-                       firstResolvedType = firstType;
-               }
-
-               if (secondType.isPrimitive()) {
-                       secondResolvedType = 
RpcCompletenessTest.resolvePrimitiveType(secondType);
-               } else {
-                       secondResolvedType = secondType;
-               }
-
-               return firstResolvedType.equals(secondResolvedType);
-       }
-
-       /**
-        * Generates from a gateway rpc method signature the corresponding rpc 
endpoint signature.
-        *
-        * For example the {@link RpcTimeout} annotation adds an additional 
parameter to the gateway
-        * signature which is not relevant on the server side.
-        *
-        * @param method Method to generate the signature string for
-        * @return String of the respective server side rpc method signature
-        */
-       private String generateEndpointMethodSignature(Method method) {
-               StringBuilder builder = new StringBuilder();
-
-               if (method.getReturnType().equals(Void.TYPE)) {
-                       builder.append("void").append(" ");
-               } else if (method.getReturnType().equals(futureClass)) {
-                       ReflectionUtil.FullTypeInfo fullTypeInfo = 
ReflectionUtil.getFullTemplateType(method.getGenericReturnType(), 0);
-
-                       builder
-                               .append(futureClass.getSimpleName())
-                               .append("<")
-                               .append(fullTypeInfo != null ? 
fullTypeInfo.toString() : "")
-                               .append(">");
-
-                       if (fullTypeInfo != null) {
-                               builder.append("/").append(fullTypeInfo);
-                       }
-
-                       builder.append(" ");
-               } else {
-                       return "Invalid rpc method signature.";
-               }
-
-               builder.append(method.getName()).append("(");
-
-               Class<?>[] parameterTypes = method.getParameterTypes();
-               Annotation[][] parameterAnnotations = 
method.getParameterAnnotations();
-
-               assertEquals(parameterTypes.length, 
parameterAnnotations.length);
-
-               for (int i = 0; i < parameterTypes.length; i++) {
-                       // filter out the RpcTimeout parameters
-                       if 
(!RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
-                               builder.append(parameterTypes[i].getName());
-
-                               if (i < parameterTypes.length -1) {
-                                       builder.append(", ");
-                               }
-                       }
-               }
-
-               builder.append(")");
-
-               return builder.toString();
-       }
-
-       private static boolean isRpcTimeout(Annotation[] annotations) {
-               for (Annotation annotation : annotations) {
-                       if 
(annotation.annotationType().equals(RpcTimeout.class)) {
-                               return true;
-                       }
-               }
-
-               return false;
-       }
-
-       /**
-        * Returns the boxed type for a primitive type.
-        *
-        * @param primitveType Primitive type to resolve
-        * @return Boxed type for the given primitive type
-        */
-       private static Class<?> resolvePrimitiveType(Class<?> primitveType) {
-               assert primitveType.isPrimitive();
-
-               TypeInformation<?> typeInformation = 
BasicTypeInfo.getInfoFor(primitveType);
-
-               if (typeInformation != null) {
-                       return typeInformation.getTypeClass();
-               } else {
-                       throw new RuntimeException("Could not retrive basic 
type information for primitive type " + primitveType + '.');
-               }
-       }
-
-       /**
-        * Extract all rpc methods defined by the gateway interface
-        *
-        * @param interfaceClass the given rpc gateway interface
-        * @return all methods defined by the given interface
-        */
-       private List<Method> getRpcMethodsFromGateway(Class<? extends 
RpcGateway> interfaceClass) {
-               if(!interfaceClass.isInterface()) {
-                       fail(interfaceClass.getName() + " is not a interface");
-               }
-
-               ArrayList<Method> allMethods = new ArrayList<>();
-               // Methods defined in RpcGateway are native method
-               if(interfaceClass.equals(RpcGateway.class)) {
-                       return allMethods;
-               }
-
-               // Get all methods declared in current interface
-               Collections.addAll(allMethods, 
interfaceClass.getDeclaredMethods());
-
-               // Get all method inherited from super interface
-               for (Class<?> superClass : interfaceClass.getInterfaces()) {
-                       @SuppressWarnings("unchecked")
-                       Class<? extends RpcGateway> gatewayClass = (Class<? 
extends RpcGateway>) superClass;
-                       
allMethods.addAll(getRpcMethodsFromGateway(gatewayClass));
-               }
-               return allMethods;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
new file mode 100644
index 0000000..b3e8ee6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the RpcEndpoint and its self gateways.
+ */
+public class RpcEndpointTest extends TestLogger {
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+       private static ActorSystem actorSystem = null;
+       private static RpcService rpcService = null;
+
+       @BeforeClass
+       public static void setup() {
+               actorSystem = AkkaUtils.createDefaultActorSystem();
+               rpcService = new AkkaRpcService(actorSystem, TIMEOUT);
+       }
+
+       @AfterClass
+       public static void teardown() throws Exception {
+               if (rpcService != null) {
+                       rpcService.stopService();
+               }
+
+               if (actorSystem != null) {
+                       actorSystem.shutdown();
+               }
+
+               if (rpcService != null) {
+                       
rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+               }
+
+               if (actorSystem != null) {
+                       actorSystem.awaitTermination(new 
FiniteDuration(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS));
+               }
+       }
+
+       /**
+        * Tests that we can obtain the self gateway from a RpcEndpoint and can 
interact with
+        * it via the self gateway.
+        */
+       @Test
+       public void testSelfGateway() throws Exception {
+               int expectedValue = 1337;
+               BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 
expectedValue);
+
+               try {
+                       baseEndpoint.start();
+
+                       BaseGateway baseGateway = 
baseEndpoint.getSelfGateway(BaseGateway.class);
+
+                       CompletableFuture<Integer> foobar = 
baseGateway.foobar();
+
+                       assertEquals(Integer.valueOf(expectedValue), 
foobar.get());
+               } finally {
+                       baseEndpoint.shutDown();
+               }
+       }
+
+       /**
+        * Tests that we cannot accidentally obtain a wrong self gateway type 
which is
+        * not implemented by the RpcEndpoint.
+        */
+       @Test(expected = RuntimeException.class)
+       public void testWrongSelfGateway() throws Exception {
+               int expectedValue = 1337;
+               BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 
expectedValue);
+
+               try {
+                       baseEndpoint.start();
+
+                       DifferentGateway differentGateway = 
baseEndpoint.getSelfGateway(DifferentGateway.class);
+
+                       fail("Expected to fail with a RuntimeException since we 
requested the wrong gateway type.");
+               } finally {
+                       baseEndpoint.shutDown();
+               }
+       }
+
+       /**
+        * Tests that we can extend existing RpcEndpoints and can communicate 
with them via the
+        * self gateways.
+        */
+       @Test
+       public void testEndpointInheritance() throws Exception {
+               int foobar = 1;
+               int barfoo = 2;
+               String foo = "foobar";
+
+               ExtendedEndpoint endpoint = new ExtendedEndpoint(rpcService, 
foobar, barfoo, foo);
+
+               try {
+                       endpoint.start();
+
+                       BaseGateway baseGateway = 
endpoint.getSelfGateway(BaseGateway.class);
+                       ExtendedGateway extendedGateway = 
endpoint.getSelfGateway(ExtendedGateway.class);
+                       DifferentGateway differentGateway = 
endpoint.getSelfGateway(DifferentGateway.class);
+
+                       assertEquals(Integer.valueOf(foobar), 
baseGateway.foobar().get());
+                       assertEquals(Integer.valueOf(foobar), 
extendedGateway.foobar().get());
+
+                       assertEquals(Integer.valueOf(barfoo), 
extendedGateway.barfoo().get());
+                       assertEquals(foo, differentGateway.foo().get());
+               } finally {
+                       endpoint.shutDown();
+               }
+       }
+
+       public interface BaseGateway extends RpcGateway {
+               CompletableFuture<Integer> foobar();
+       }
+
+       public interface ExtendedGateway extends BaseGateway {
+               CompletableFuture<Integer> barfoo();
+       }
+
+       public interface DifferentGateway extends RpcGateway {
+               CompletableFuture<String> foo();
+       }
+
+       public static class BaseEndpoint extends RpcEndpoint implements 
BaseGateway {
+
+               private final int foobarValue;
+
+               protected BaseEndpoint(RpcService rpcService, int foobarValue) {
+                       super(rpcService);
+
+                       this.foobarValue = foobarValue;
+               }
+
+               @Override
+               public CompletableFuture<Integer> foobar() {
+                       return CompletableFuture.completedFuture(foobarValue);
+               }
+       }
+
+       public static class ExtendedEndpoint extends BaseEndpoint implements 
ExtendedGateway, DifferentGateway {
+
+               private final int barfooValue;
+
+               private final String fooString;
+
+               protected ExtendedEndpoint(RpcService rpcService, int 
foobarValue, int barfooValue, String fooString) {
+                       super(rpcService, foobarValue);
+
+                       this.barfooValue = barfooValue;
+                       this.fooString = fooString;
+               }
+
+               @Override
+               public CompletableFuture<Integer> barfoo() {
+                       return CompletableFuture.completedFuture(barfooValue);
+               }
+
+               @Override
+               public CompletableFuture<String> foo() {
+                       return CompletableFuture.completedFuture(fooString);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 37349a1..cb38f6f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
@@ -30,8 +29,8 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
-import java.util.BitSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -139,32 +138,32 @@ public class TestingSerialRpcService implements 
RpcService {
        }
 
        @Override
-       public void stopServer(RpcGateway selfGateway) {
+       public void stopServer(RpcServer selfGateway) {
                registeredConnections.remove(selfGateway.getAddress());
        }
 
        @Override
-       public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S 
rpcEndpoint) {
+       public <S extends RpcEndpoint & RpcGateway> RpcServer startServer(S 
rpcEndpoint) {
                final String address = UUID.randomUUID().toString();
 
                InvocationHandler akkaInvocationHandler = new 
TestingSerialRpcService.TestingSerialInvocationHandler<>(address, rpcEndpoint);
                ClassLoader classLoader = getClass().getClassLoader();
 
+               Set<Class<? extends RpcGateway>> implementedRpcGateways = 
RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass());
+
+               implementedRpcGateways.add(RpcServer.class);
+
+
                @SuppressWarnings("unchecked")
-               C self = (C) Proxy.newProxyInstance(
+               RpcServer rpcServer = (RpcServer) Proxy.newProxyInstance(
                        classLoader,
-                       new Class<?>[]{
-                               rpcEndpoint.getSelfGatewayType(),
-                               MainThreadExecutable.class,
-                               StartStoppable.class,
-                               RpcGateway.class
-                       },
+                       implementedRpcGateways.toArray(new 
Class<?>[implementedRpcGateways.size()]),
                        akkaInvocationHandler);
 
                // register self
-               registeredConnections.putIfAbsent(self.getAddress(), self);
+               registeredConnections.putIfAbsent(rpcServer.getAddress(), 
rpcServer);
 
-               return self;
+               return rpcServer;
        }
 
        @Override
@@ -211,7 +210,7 @@ public class TestingSerialRpcService implements RpcService {
                registeredConnections.clear();
        }
 
-       private static final class TestingSerialInvocationHandler<C extends 
RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, 
MainThreadExecutable, StartStoppable {
+       private static final class TestingSerialInvocationHandler<T extends 
RpcEndpoint & RpcGateway> implements InvocationHandler, RpcGateway, 
MainThreadExecutable, StartStoppable {
 
                private final T rpcEndpoint;
 
@@ -234,7 +233,9 @@ public class TestingSerialRpcService implements RpcService {
                public Object invoke(Object proxy, Method method, Object[] 
args) throws Throwable {
                        Class<?> declaringClass = method.getDeclaringClass();
                        if (declaringClass.equals(MainThreadExecutable.class) ||
-                               declaringClass.equals(Object.class) || 
declaringClass.equals(StartStoppable.class) ||
+                               declaringClass.equals(Object.class) ||
+                               declaringClass.equals(StartStoppable.class) ||
+                               declaringClass.equals(RpcServer.class) ||
                                declaringClass.equals(RpcGateway.class)) {
                                return method.invoke(this, args);
                        } else {
@@ -243,22 +244,17 @@ public class TestingSerialRpcService implements 
RpcService {
                                Annotation[][] parameterAnnotations = 
method.getParameterAnnotations();
                                Time futureTimeout = 
extractRpcTimeout(parameterAnnotations, args, timeout);
 
-                               final Tuple2<Class<?>[], Object[]> 
filteredArguments = filterArguments(
-                                       parameterTypes,
-                                       parameterAnnotations,
-                                       args);
-
                                Class<?> returnType = method.getReturnType();
 
                                if (returnType.equals(CompletableFuture.class)) 
{
                                        try {
-                                               Object result = 
handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, 
futureTimeout);
+                                               Object result = 
handleRpcInvocationSync(methodName, parameterTypes, args, futureTimeout);
                                                return 
CompletableFuture.completedFuture(result);
                                        } catch (Throwable e) {
                                                return 
FutureUtils.completedExceptionally(e);
                                        }
                                } else {
-                                       return 
handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, 
futureTimeout);
+                                       return 
handleRpcInvocationSync(methodName, parameterTypes, args, futureTimeout);
                                }
                        }
                }
@@ -380,61 +376,6 @@ public class TestingSerialRpcService implements RpcService 
{
                }
 
                /**
-                * Removes all {@link RpcTimeout} annotated parameters from the 
parameter type and argument
-                * list.
-                *
-                * @param parameterTypes       Array of parameter types
-                * @param parameterAnnotations Array of parameter annotations
-                * @param args                 Arary of arguments
-                * @return Tuple of filtered parameter types and arguments 
which no longer contain the
-                * {@link RpcTimeout} annotated parameter types and arguments
-                */
-               private static Tuple2<Class<?>[], Object[]> filterArguments(
-                       Class<?>[] parameterTypes,
-                       Annotation[][] parameterAnnotations,
-                       Object[] args) {
-
-                       Class<?>[] filteredParameterTypes;
-                       Object[] filteredArgs;
-
-                       if (args == null) {
-                               filteredParameterTypes = parameterTypes;
-                               filteredArgs = null;
-                       } else {
-                               
Preconditions.checkArgument(parameterTypes.length == 
parameterAnnotations.length);
-                               
Preconditions.checkArgument(parameterAnnotations.length == args.length);
-
-                               BitSet isRpcTimeoutParameter = new 
BitSet(parameterTypes.length);
-                               int numberRpcParameters = parameterTypes.length;
-
-                               for (int i = 0; i < parameterTypes.length; i++) 
{
-                                       if 
(isRpcTimeout(parameterAnnotations[i])) {
-                                               isRpcTimeoutParameter.set(i);
-                                               numberRpcParameters--;
-                                       }
-                               }
-
-                               if (numberRpcParameters == 
parameterTypes.length) {
-                                       filteredParameterTypes = parameterTypes;
-                                       filteredArgs = args;
-                               } else {
-                                       filteredParameterTypes = new 
Class<?>[numberRpcParameters];
-                                       filteredArgs = new 
Object[numberRpcParameters];
-                                       int counter = 0;
-
-                                       for (int i = 0; i < 
parameterTypes.length; i++) {
-                                               if 
(!isRpcTimeoutParameter.get(i)) {
-                                                       
filteredParameterTypes[counter] = parameterTypes[i];
-                                                       filteredArgs[counter] = 
args[i];
-                                                       counter++;
-                                               }
-                                       }
-                               }
-                       }
-                       return Tuple2.of(filteredParameterTypes, filteredArgs);
-               }
-
-               /**
                 * Checks whether any of the annotations is of type {@link 
RpcTimeout}
                 *
                 * @param annotations Array of annotations

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 793d292..56d17e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
@@ -105,7 +104,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
                DummyRpcEndpoint rpcEndpoint = new 
DummyRpcEndpoint(akkaRpcService);
 
-               DummyRpcGateway rpcGateway = rpcEndpoint.getSelf();
+               DummyRpcGateway rpcGateway = 
rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
 
                // this message should be discarded and completed with an 
AkkaRpcException
                CompletableFuture<Integer> result = rpcGateway.foobar();
@@ -192,7 +191,7 @@ public class AkkaRpcActorTest extends TestLogger {
                ExceptionalEndpoint rpcEndpoint = new 
ExceptionalEndpoint(akkaRpcService);
                rpcEndpoint.start();
 
-               ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+               ExceptionalGateway rpcGateway = 
rpcEndpoint.getSelfGateway(ExceptionalGateway.class);
                CompletableFuture<Integer> result = rpcGateway.doStuff();
 
                try {
@@ -211,7 +210,7 @@ public class AkkaRpcActorTest extends TestLogger {
                ExceptionalFutureEndpoint rpcEndpoint = new 
ExceptionalFutureEndpoint(akkaRpcService);
                rpcEndpoint.start();
 
-               ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+               ExceptionalGateway rpcGateway = 
rpcEndpoint.getSelfGateway(ExceptionalGateway.class);
                CompletableFuture<Integer> result = rpcGateway.doStuff();
 
                try {
@@ -275,7 +274,7 @@ public class AkkaRpcActorTest extends TestLogger {
                void tell(String message);
        }
 
-       private static class DummyRpcEndpoint extends 
RpcEndpoint<DummyRpcGateway> {
+       private static class DummyRpcEndpoint extends RpcEndpoint implements 
DummyRpcGateway {
 
                private volatile int _foobar = 42;
 
@@ -283,9 +282,9 @@ public class AkkaRpcActorTest extends TestLogger {
                        super(rpcService);
                }
 
-               @RpcMethod
-               public int foobar() {
-                       return _foobar;
+               @Override
+               public CompletableFuture<Integer> foobar() {
+                       return CompletableFuture.completedFuture(_foobar);
                }
 
                public void setFoobar(int value) {
@@ -299,25 +298,25 @@ public class AkkaRpcActorTest extends TestLogger {
                CompletableFuture<Integer> doStuff();
        }
 
-       private static class ExceptionalEndpoint extends 
RpcEndpoint<ExceptionalGateway> {
+       private static class ExceptionalEndpoint extends RpcEndpoint implements 
ExceptionalGateway {
 
                protected ExceptionalEndpoint(RpcService rpcService) {
                        super(rpcService);
                }
 
-               @RpcMethod
-               public int doStuff() {
+               @Override
+               public CompletableFuture<Integer> doStuff() {
                        throw new RuntimeException("my super specific test 
exception");
                }
        }
 
-       private static class ExceptionalFutureEndpoint extends 
RpcEndpoint<ExceptionalGateway> {
+       private static class ExceptionalFutureEndpoint extends RpcEndpoint 
implements ExceptionalGateway {
 
                protected ExceptionalFutureEndpoint(RpcService rpcService) {
                        super(rpcService);
                }
 
-               @RpcMethod
+               @Override
                public CompletableFuture<Integer> doStuff() {
                        final CompletableFuture<Integer> future = new 
CompletableFuture<>();
 
@@ -338,7 +337,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
        // 
------------------------------------------------------------------------
 
-       private static class SimpleRpcEndpoint extends RpcEndpoint<RpcGateway> {
+       private static class SimpleRpcEndpoint extends RpcEndpoint implements 
RpcGateway {
 
                protected SimpleRpcEndpoint(RpcService rpcService, String 
endpointId) {
                        super(rpcService, endpointId);
@@ -352,7 +351,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
        // 
------------------------------------------------------------------------
 
-       private static class FailingPostStopEndpoint extends 
RpcEndpoint<RpcGateway> {
+       private static class FailingPostStopEndpoint extends RpcEndpoint 
implements RpcGateway {
 
                protected FailingPostStopEndpoint(RpcService rpcService, String 
endpointId) {
                        super(rpcService, endpointId);

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 9f134d8..96a9ee4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.apache.flink.util.TestLogger;
@@ -52,7 +51,7 @@ public class MainThreadValidationTest extends TestLogger {
                        testEndpoint.start();
 
                        // this works, because it is executed as an RPC call
-                       
testEndpoint.getSelf().someConcurrencyCriticalFunction();
+                       
testEndpoint.getSelfGateway(TestGateway.class).someConcurrencyCriticalFunction();
 
                        // this fails, because it is executed directly
                        boolean exceptionThrown;
@@ -65,7 +64,7 @@ public class MainThreadValidationTest extends TestLogger {
                        }
                        assertTrue("should fail with an assertion error", 
exceptionThrown);
 
-                       akkaRpcService.stopServer(testEndpoint.getSelf());
+                       testEndpoint.shutDown();
                }
                finally {
                        akkaRpcService.stopService();
@@ -82,13 +81,13 @@ public class MainThreadValidationTest extends TestLogger {
        }
 
        @SuppressWarnings("unused")
-       public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+       public static class TestEndpoint extends RpcEndpoint implements 
TestGateway {
 
                public TestEndpoint(RpcService rpcService) {
                        super(rpcService);
                }
 
-               @RpcMethod
+               @Override
                public void someConcurrencyCriticalFunction() {
                        validateRunsInMainThread();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 34cf412..c722980 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.TestLogger;
 import org.hamcrest.core.Is;
@@ -85,7 +84,7 @@ public class MessageSerializationTest extends TestLogger {
                TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, 
linkedBlockingQueue);
                testEndpoint.start();
 
-               TestGateway testGateway = testEndpoint.getSelf();
+               TestGateway testGateway = 
testEndpoint.getSelfGateway(TestGateway.class);
 
                NonSerializableObject expected = new NonSerializableObject(42);
 
@@ -169,7 +168,7 @@ public class MessageSerializationTest extends TestLogger {
                void foobar(Object object) throws IOException, 
InterruptedException;
        }
 
-       private static class TestEndpoint extends RpcEndpoint<TestGateway> {
+       private static class TestEndpoint extends RpcEndpoint implements 
TestGateway {
 
                private final LinkedBlockingQueue<Object> queue;
 
@@ -178,7 +177,7 @@ public class MessageSerializationTest extends TestLogger {
                        this.queue = queue;
                }
 
-               @RpcMethod
+               @Override
                public void foobar(Object object) throws InterruptedException {
                        queue.put(object);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 53c435e..4c87671 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -167,7 +168,7 @@ public class TaskExecutorITCase extends TestLogger {
                        
any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
 
-               rpcService.registerGateway(rmAddress, 
resourceManager.getSelf());
+               rpcService.registerGateway(rmAddress, 
resourceManager.getSelfGateway(ResourceManagerGateway.class));
                rpcService.registerGateway(jmAddress, jmGateway);
 
                final AllocationID allocationId = new AllocationID();
@@ -189,13 +190,14 @@ public class TaskExecutorITCase extends TestLogger {
                                jmLeaderId,
                                jmResourceId,
                                jmAddress,
-                               jobId);
+                               jobId,
+                               Time.milliseconds(0L));
 
                        RegistrationResponse registrationResponse = 
registrationResponseFuture.get();
 
                        assertTrue(registrationResponse instanceof 
JobMasterRegistrationSuccess);
 
-                       resourceManager.requestSlot(jmLeaderId, rmLeaderId, 
slotRequest);
+                       resourceManager.requestSlot(jmLeaderId, rmLeaderId, 
slotRequest, Time.milliseconds(0L));
 
                        verify(jmGateway).offerSlots(
                                eq(taskManagerResourceId),

Reply via email to