http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
new file mode 100644
index 0000000..464a261
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * {@link ResourceManager} rpc gateway interface.
+ */
+public interface ResourceManagerGateway extends RpcGateway {
+
+       /**
+        * Register a {@link JobMaster} at the resource manager.
+        *
+        * @param jobMasterRegistration Job master registration information
+        * @param timeout Timeout for the future to complete
+        * @return Future registration response
+        */
+       Future<RegistrationResponse> registerJobMaster(
+               JobMasterRegistration jobMasterRegistration,
+               @RpcTimeout FiniteDuration timeout);
+
+       /**
+        * Register a {@link JobMaster} at the resource manager.
+        *
+        * @param jobMasterRegistration Job master registration information
+        * @return Future registration response
+        */
+       Future<RegistrationResponse> registerJobMaster(JobMasterRegistration 
jobMasterRegistration);
+
+       /**
+        * Requests a slot from the resource manager.
+        *
+        * @param slotRequest Slot request
+        * @return Future slot assignment
+        */
+       Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
new file mode 100644
index 0000000..86cd8b7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class SlotAssignment implements Serializable{
+       private static final long serialVersionUID = -6990813455942742322L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
new file mode 100644
index 0000000..d8fe268
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class SlotRequest implements Serializable{
+       private static final long serialVersionUID = -6586877187990445986L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
new file mode 100644
index 0000000..cdfc3bd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import akka.dispatch.ExecutionContexts$;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * TaskExecutor implementation. The task executor is responsible for the 
execution of multiple
+ * {@link org.apache.flink.runtime.taskmanager.Task}.
+ *
+ * It offers the following methods as part of its rpc interface to interact 
with him remotely:
+ * <ul>
+ *     <li>{@link #executeTask(TaskDeploymentDescriptor)} executes a given 
task on the TaskExecutor</li>
+ *     <li>{@link #cancelTask(ExecutionAttemptID)} cancels a given task 
identified by the {@link ExecutionAttemptID}</li>
+ * </ul>
+ */
+public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
+       private final ExecutionContext executionContext;
+       private final Set<ExecutionAttemptID> tasks = new HashSet<>();
+
+       public TaskExecutor(RpcService rpcService, ExecutorService 
executorService) {
+               super(rpcService);
+               this.executionContext = 
ExecutionContexts$.MODULE$.fromExecutor(executorService);
+       }
+
+       /**
+        * Execute the given task on the task executor. The task is described 
by the provided
+        * {@link TaskDeploymentDescriptor}.
+        *
+        * @param taskDeploymentDescriptor Descriptor for the task to be 
executed
+        * @return Acknowledge the start of the task execution
+        */
+       @RpcMethod
+       public Acknowledge executeTask(TaskDeploymentDescriptor 
taskDeploymentDescriptor) {
+               tasks.add(taskDeploymentDescriptor.getExecutionId());
+               return Acknowledge.get();
+       }
+
+       /**
+        * Cancel a task identified by it {@link ExecutionAttemptID}. If the 
task cannot be found, then
+        * the method throws an {@link Exception}.
+        *
+        * @param executionAttemptId Execution attempt ID identifying the task 
to be canceled.
+        * @return Acknowledge the task canceling
+        * @throws Exception if the task with the given execution attempt id 
could not be found
+        */
+       @RpcMethod
+       public Acknowledge cancelTask(ExecutionAttemptID executionAttemptId) 
throws Exception {
+               if (tasks.contains(executionAttemptId)) {
+                       return Acknowledge.get();
+               } else {
+                       throw new Exception("Could not find task.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
new file mode 100644
index 0000000..450423e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import scala.concurrent.Future;
+
+/**
+ * {@link TaskExecutor} rpc gateway interface
+ */
+public interface TaskExecutorGateway extends RpcGateway {
+       /**
+        * Execute the given task on the task executor. The task is described 
by the provided
+        * {@link TaskDeploymentDescriptor}.
+        *
+        * @param taskDeploymentDescriptor Descriptor for the task to be 
executed
+        * @return Future acknowledge of the start of the task execution
+        */
+       Future<Acknowledge> executeTask(TaskDeploymentDescriptor 
taskDeploymentDescriptor);
+
+       /**
+        * Cancel a task identified by it {@link ExecutionAttemptID}. If the 
task cannot be found, then
+        * the method throws an {@link Exception}.
+        *
+        * @param executionAttemptId Execution attempt ID identifying the task 
to be canceled.
+        * @return Future acknowledge of the task canceling
+        */
+       Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
new file mode 100644
index 0000000..0ded25e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.reflections.Reflections;
+import scala.concurrent.Future;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RpcCompletenessTest extends TestLogger {
+       private static final Class<?> futureClass = Future.class;
+
+       @Test
+       public void testRpcCompleteness() {
+               Reflections reflections = new Reflections("org.apache.flink");
+
+               Set<Class<? extends RpcEndpoint>> classes = 
reflections.getSubTypesOf(RpcEndpoint.class);
+
+               Class<? extends RpcEndpoint> c;
+
+               for (Class<? extends RpcEndpoint> rpcEndpoint :classes){
+                       c = rpcEndpoint;
+                       Type superClass = c.getGenericSuperclass();
+
+                       Class<?> rpcGatewayType = 
extractTypeParameter(superClass, 0);
+
+                       if (rpcGatewayType != null) {
+                               checkCompleteness(rpcEndpoint, (Class<? extends 
RpcGateway>) rpcGatewayType);
+                       } else {
+                               fail("Could not retrieve the rpc gateway class 
for the given rpc endpoint class " + rpcEndpoint.getName());
+                       }
+               }
+       }
+
+       private void checkCompleteness(Class<? extends RpcEndpoint> 
rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
+               Method[] gatewayMethods = rpcGateway.getDeclaredMethods();
+               Method[] serverMethods = rpcEndpoint.getDeclaredMethods();
+
+               Map<String, Set<Method>> rpcMethods = new HashMap<>();
+               Set<Method> unmatchedRpcMethods = new HashSet<>();
+
+               for (Method serverMethod : serverMethods) {
+                       if (serverMethod.isAnnotationPresent(RpcMethod.class)) {
+                               if 
(rpcMethods.containsKey(serverMethod.getName())) {
+                                       Set<Method> methods = 
rpcMethods.get(serverMethod.getName());
+                                       methods.add(serverMethod);
+
+                                       rpcMethods.put(serverMethod.getName(), 
methods);
+                               } else {
+                                       Set<Method> methods = new HashSet<>();
+                                       methods.add(serverMethod);
+
+                                       rpcMethods.put(serverMethod.getName(), 
methods);
+                               }
+
+                               unmatchedRpcMethods.add(serverMethod);
+                       }
+               }
+
+               for (Method gatewayMethod : gatewayMethods) {
+                       assertTrue(
+                               "The rpc endpoint " + rpcEndpoint.getName() + " 
does not contain a RpcMethod " +
+                                       "annotated method with the same name 
and signature " +
+                                       
generateEndpointMethodSignature(gatewayMethod) + ".",
+                               
rpcMethods.containsKey(gatewayMethod.getName()));
+
+                       checkGatewayMethod(gatewayMethod);
+
+                       if (!matchGatewayMethodWithEndpoint(gatewayMethod, 
rpcMethods.get(gatewayMethod.getName()), unmatchedRpcMethods)) {
+                               fail("Could not find a RpcMethod annotated 
method in rpc endpoint " +
+                                       rpcEndpoint.getName() + " matching the 
rpc gateway method " +
+                                       
generateEndpointMethodSignature(gatewayMethod) + " defined in the rpc gateway " 
+
+                                       rpcGateway.getName() + ".");
+                       }
+               }
+
+               if (!unmatchedRpcMethods.isEmpty()) {
+                       StringBuilder builder = new StringBuilder();
+
+                       for (Method unmatchedRpcMethod : unmatchedRpcMethods) {
+                               builder.append(unmatchedRpcMethod).append("\n");
+                       }
+
+                       fail("The rpc endpoint " + rpcEndpoint.getName() + " 
contains rpc methods which " +
+                               "are not matched to gateway methods of " + 
rpcGateway.getName() + ":\n" +
+                               builder.toString());
+               }
+       }
+
+       /**
+        * Checks whether the gateway method fulfills the gateway method 
requirements.
+        * <ul>
+        *     <li>It checks whether the return type is void or a {@link 
Future} wrapping the actual result. </li>
+        *     <li>It checks that the method's parameter list contains at most 
one parameter annotated with {@link RpcTimeout}.</li>
+        * </ul>
+        *
+        * @param gatewayMethod Gateway method to check
+        */
+       private void checkGatewayMethod(Method gatewayMethod) {
+               if (!gatewayMethod.getReturnType().equals(Void.TYPE)) {
+                       assertTrue(
+                               "The return type of method " + 
gatewayMethod.getName() + " in the rpc gateway " +
+                                       
gatewayMethod.getDeclaringClass().getName() + " is non void and not a " +
+                                       "future. Non-void return types have to 
be returned as a future.",
+                               
gatewayMethod.getReturnType().equals(futureClass));
+               }
+
+               Annotation[][] parameterAnnotations = 
gatewayMethod.getParameterAnnotations();
+               int rpcTimeoutParameters = 0;
+
+               for (Annotation[] parameterAnnotation : parameterAnnotations) {
+                       for (Annotation annotation : parameterAnnotation) {
+                               if (annotation.equals(RpcTimeout.class)) {
+                                       rpcTimeoutParameters++;
+                               }
+                       }
+               }
+
+               assertTrue("The gateway method " + gatewayMethod + " must have 
at most one RpcTimeout " +
+                       "annotated parameter.", rpcTimeoutParameters <= 1);
+       }
+
+       /**
+        * Checks whether we find a matching overloaded version for the gateway 
method among the methods
+        * with the same name in the rpc endpoint.
+        *
+        * @param gatewayMethod Gateway method
+        * @param endpointMethods Set of rpc methods on the rpc endpoint with 
the same name as the gateway
+        *                   method
+        * @param unmatchedRpcMethods Set of unmatched rpc methods on the 
endpoint side (so far)
+        */
+       private boolean matchGatewayMethodWithEndpoint(Method gatewayMethod, 
Set<Method> endpointMethods, Set<Method> unmatchedRpcMethods) {
+               for (Method endpointMethod : endpointMethods) {
+                       if (checkMethod(gatewayMethod, endpointMethod)) {
+                               unmatchedRpcMethods.remove(endpointMethod);
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+
+       private boolean checkMethod(Method gatewayMethod, Method 
endpointMethod) {
+               Class<?>[] gatewayParameterTypes = 
gatewayMethod.getParameterTypes();
+               Annotation[][] gatewayParameterAnnotations = 
gatewayMethod.getParameterAnnotations();
+
+               Class<?>[] endpointParameterTypes = 
endpointMethod.getParameterTypes();
+
+               List<Class<?>> filteredGatewayParameterTypes = new 
ArrayList<>();
+
+               assertEquals(gatewayParameterTypes.length, 
gatewayParameterAnnotations.length);
+
+               // filter out the RpcTimeout parameters
+               for (int i = 0; i < gatewayParameterTypes.length; i++) {
+                       if (!isRpcTimeout(gatewayParameterAnnotations[i])) {
+                               
filteredGatewayParameterTypes.add(gatewayParameterTypes[i]);
+                       }
+               }
+
+               if (filteredGatewayParameterTypes.size() != 
endpointParameterTypes.length) {
+                       return false;
+               } else {
+                       // check the parameter types
+                       for (int i = 0; i < 
filteredGatewayParameterTypes.size(); i++) {
+                               if 
(!checkType(filteredGatewayParameterTypes.get(i), endpointParameterTypes[i])) {
+                                       return false;
+                               }
+                       }
+
+                       // check the return types
+                       if (endpointMethod.getReturnType() == void.class) {
+                               if (gatewayMethod.getReturnType() != 
void.class) {
+                                       return false;
+                               }
+                       } else {
+                               // has return value. The gateway method should 
be wrapped in a future
+                               Class<?> futureClass = 
gatewayMethod.getReturnType();
+
+                               // sanity check that the return type of a 
gateway method must be void or a future
+                               if 
(!futureClass.equals(RpcCompletenessTest.futureClass)) {
+                                       return false;
+                               } else {
+                                       Class<?> valueClass = 
extractTypeParameter(futureClass, 0);
+
+                                       if 
(endpointMethod.getReturnType().equals(futureClass)) {
+                                               Class<?> rpcEndpointValueClass 
= extractTypeParameter(endpointMethod.getReturnType(), 0);
+
+                                               // check if we have the same 
future value types
+                                               if (valueClass != null && 
rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) 
{
+                                                       return false;
+                                               }
+                                       } else {
+                                               if (valueClass != null && 
!checkType(valueClass, endpointMethod.getReturnType())) {
+                                                       return false;
+                                               }
+                                       }
+                               }
+                       }
+
+                       return 
gatewayMethod.getName().equals(endpointMethod.getName());
+               }
+       }
+
+       private boolean checkType(Class<?> firstType, Class<?> secondType) {
+               return firstType.equals(secondType);
+       }
+
+       /**
+        * Generates from a gateway rpc method signature the corresponding rpc 
endpoint signature.
+        *
+        * For example the {@link RpcTimeout} annotation adds an additional 
parameter to the gateway
+        * signature which is not relevant on the server side.
+        *
+        * @param method Method to generate the signature string for
+        * @return String of the respective server side rpc method signature
+        */
+       private String generateEndpointMethodSignature(Method method) {
+               StringBuilder builder = new StringBuilder();
+
+               if (method.getReturnType().equals(Void.TYPE)) {
+                       builder.append("void").append(" ");
+               } else if (method.getReturnType().equals(futureClass)) {
+                       Class<?> valueClass = 
extractTypeParameter(method.getGenericReturnType(), 0);
+
+                       builder
+                               .append(futureClass.getSimpleName())
+                               .append("<")
+                               .append(valueClass != null ? 
valueClass.getSimpleName() : "")
+                               .append(">");
+
+                       if (valueClass != null) {
+                               
builder.append("/").append(valueClass.getSimpleName());
+                       }
+
+                       builder.append(" ");
+               } else {
+                       return "Invalid rpc method signature.";
+               }
+
+               builder.append(method.getName()).append("(");
+
+               Class<?>[] parameterTypes = method.getParameterTypes();
+               Annotation[][] parameterAnnotations = 
method.getParameterAnnotations();
+
+               assertEquals(parameterTypes.length, 
parameterAnnotations.length);
+
+               for (int i = 0; i < parameterTypes.length; i++) {
+                       // filter out the RpcTimeout parameters
+                       if (!isRpcTimeout(parameterAnnotations[i])) {
+                               builder.append(parameterTypes[i].getName());
+
+                               if (i < parameterTypes.length -1) {
+                                       builder.append(", ");
+                               }
+                       }
+               }
+
+               builder.append(")");
+
+               return builder.toString();
+       }
+
+       private Class<?> extractTypeParameter(Type genericType, int position) {
+               if (genericType instanceof ParameterizedType) {
+                       ParameterizedType parameterizedType = 
(ParameterizedType) genericType;
+
+                       Type[] typeArguments = 
parameterizedType.getActualTypeArguments();
+
+                       if (position < 0 || position >= typeArguments.length) {
+                               throw new IndexOutOfBoundsException("The 
generic type " +
+                                       parameterizedType.getRawType() + " only 
has " + typeArguments.length +
+                                       " type arguments.");
+                       } else {
+                               Type typeArgument = typeArguments[position];
+
+                               if (typeArgument instanceof Class<?>) {
+                                       return (Class<?>) typeArgument;
+                               } else {
+                                       return null;
+                               }
+                       }
+               } else {
+                       return null;
+               }
+       }
+
+       private boolean isRpcTimeout(Annotation[] annotations) {
+               for (Annotation annotation : annotations) {
+                       if 
(annotation.annotationType().equals(RpcTimeout.class)) {
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
new file mode 100644
index 0000000..c5bac94
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AkkaRpcServiceTest extends TestLogger {
+
+       /**
+        * Tests that the {@link JobMaster} can connect to the {@link 
ResourceManager} using the
+        * {@link AkkaRpcService}.
+        */
+       @Test
+       public void testJobMasterResourceManagerRegistration() throws Exception 
{
+               Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS);
+               ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+               ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+               AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, 
akkaTimeout);
+               AkkaRpcService akkaRpcService2 = new 
AkkaRpcService(actorSystem2, akkaTimeout);
+               ExecutorService executorService = new ForkJoinPool();
+
+               ResourceManager resourceManager = new 
ResourceManager(akkaRpcService, executorService);
+               JobMaster jobMaster = new JobMaster(akkaRpcService2, 
executorService);
+
+               resourceManager.start();
+
+               ResourceManagerGateway rm = resourceManager.getSelf();
+
+               assertTrue(rm instanceof AkkaGateway);
+
+               AkkaGateway akkaClient = (AkkaGateway) rm;
+
+               jobMaster.start();
+               
jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, 
akkaClient.getActorRef()));
+
+               // wait for successful registration
+               FiniteDuration timeout = new FiniteDuration(20, 
TimeUnit.SECONDS);
+               Deadline deadline = timeout.fromNow();
+
+               while (deadline.hasTimeLeft() && !jobMaster.isConnected()) {
+                       Thread.sleep(100);
+               }
+
+               assertFalse(deadline.isOverdue());
+
+               jobMaster.shutDown();
+               resourceManager.shutDown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
new file mode 100644
index 0000000..c143527
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class TaskExecutorTest extends TestLogger {
+
+       /**
+        * Tests that we can deploy and cancel a task on the TaskExecutor 
without exceptions
+        */
+       @Test
+       public void testTaskExecution() throws Exception {
+               RpcService testingRpcService = mock(RpcService.class);
+               DirectExecutorService directExecutorService = null;
+               TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, 
directExecutorService);
+
+               TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+                       new JobID(),
+                       "Test job",
+                       new JobVertexID(),
+                       new ExecutionAttemptID(),
+                       new SerializedValue<ExecutionConfig>(null),
+                       "Test task",
+                       0,
+                       1,
+                       0,
+                       new Configuration(),
+                       new Configuration(),
+                       "Invokable",
+                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                       Collections.<InputGateDeploymentDescriptor>emptyList(),
+                       Collections.<BlobKey>emptyList(),
+                       Collections.<URL>emptyList(),
+                       0
+               );
+
+               Acknowledge ack = taskExecutor.executeTask(tdd);
+
+               ack = taskExecutor.cancelTask(tdd.getExecutionId());
+       }
+
+       /**
+        * Tests that cancelling a non-existing task will return an exception
+        */
+       @Test(expected=Exception.class)
+       public void testWrongTaskCancellation() throws Exception {
+               RpcService testingRpcService = mock(RpcService.class);
+               DirectExecutorService directExecutorService = null;
+               TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, 
directExecutorService);
+
+               taskExecutor.cancelTask(new ExecutionAttemptID());
+
+               fail("The cancellation should have thrown an exception.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
new file mode 100644
index 0000000..1d7c971
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class DirectExecutorService implements ExecutorService {
+       private boolean _shutdown = false;
+
+       @Override
+       public void shutdown() {
+               _shutdown = true;
+       }
+
+       @Override
+       public List<Runnable> shutdownNow() {
+               _shutdown = true;
+               return Collections.emptyList();
+       }
+
+       @Override
+       public boolean isShutdown() {
+               return _shutdown;
+       }
+
+       @Override
+       public boolean isTerminated() {
+               return _shutdown;
+       }
+
+       @Override
+       public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+               return _shutdown;
+       }
+
+       @Override
+       public <T> Future<T> submit(Callable<T> task) {
+               try {
+                       T result = task.call();
+
+                       return new CompletedFuture<>(result, null);
+               } catch (Exception e) {
+                       return new CompletedFuture<>(null, e);
+               }
+       }
+
+       @Override
+       public <T> Future<T> submit(Runnable task, T result) {
+               task.run();
+
+               return new CompletedFuture<>(result, null);
+       }
+
+       @Override
+       public Future<?> submit(Runnable task) {
+               task.run();
+               return new CompletedFuture<>(null, null);
+       }
+
+       @Override
+       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) throws InterruptedException {
+               ArrayList<Future<T>> result = new ArrayList<>();
+
+               for (Callable<T> task : tasks) {
+                       try {
+                               result.add(new CompletedFuture<T>(task.call(), 
null));
+                       } catch (Exception e) {
+                               result.add(new CompletedFuture<T>(null, e));
+                       }
+               }
+               return result;
+       }
+
+       @Override
+       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout, TimeUnit unit) throws InterruptedException {
+               long end = System.currentTimeMillis() + unit.toMillis(timeout);
+               Iterator<? extends Callable<T>> iterator = tasks.iterator();
+               ArrayList<Future<T>> result = new ArrayList<>();
+
+               while (end > System.currentTimeMillis() && iterator.hasNext()) {
+                       Callable<T> callable = iterator.next();
+
+                       try {
+                               result.add(new 
CompletedFuture<T>(callable.call(), null));
+                       } catch (Exception e) {
+                               result.add(new CompletedFuture<T>(null, e));
+                       }
+               }
+
+               while(iterator.hasNext()) {
+                       iterator.next();
+                       result.add(new Future<T>() {
+                               @Override
+                               public boolean cancel(boolean 
mayInterruptIfRunning) {
+                                       return false;
+                               }
+
+                               @Override
+                               public boolean isCancelled() {
+                                       return true;
+                               }
+
+                               @Override
+                               public boolean isDone() {
+                                       return false;
+                               }
+
+                               @Override
+                               public T get() throws InterruptedException, 
ExecutionException {
+                                       throw new CancellationException("Task 
has been cancelled.");
+                               }
+
+                               @Override
+                               public T get(long timeout, TimeUnit unit) 
throws InterruptedException, ExecutionException, TimeoutException {
+                                       throw new CancellationException("Task 
has been cancelled.");
+                               }
+                       });
+               }
+
+               return result;
+       }
+
+       @Override
+       public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws 
InterruptedException, ExecutionException {
+               Exception exception = null;
+
+               for (Callable<T> task : tasks) {
+                       try {
+                               return task.call();
+                       } catch (Exception e) {
+                               // try next task
+                               exception = e;
+                       }
+               }
+
+               throw new ExecutionException("No tasks finished successfully.", 
exception);
+       }
+
+       @Override
+       public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit) throws InterruptedException, ExecutionException, 
TimeoutException {
+               long end = System.currentTimeMillis() + unit.toMillis(timeout);
+               Exception exception = null;
+
+               Iterator<? extends Callable<T>> iterator = tasks.iterator();
+
+               while (end > System.currentTimeMillis() && iterator.hasNext()) {
+                       Callable<T> callable = iterator.next();
+
+                       try {
+                               return callable.call();
+                       } catch (Exception e) {
+                               // ignore exception and try next
+                               exception = e;
+                       }
+               }
+
+               if (iterator.hasNext()) {
+                       throw new TimeoutException("Could not finish execution 
of tasks within time.");
+               } else {
+                       throw new ExecutionException("No tasks finished 
successfully.", exception);
+               }
+       }
+
+       @Override
+       public void execute(Runnable command) {
+               command.run();
+       }
+
+       public static class CompletedFuture<V> implements Future<V> {
+               private final V value;
+               private final Exception exception;
+
+               public CompletedFuture(V value, Exception exception) {
+                       this.value = value;
+                       this.exception = exception;
+               }
+
+               @Override
+               public boolean cancel(boolean mayInterruptIfRunning) {
+                       return false;
+               }
+
+               @Override
+               public boolean isCancelled() {
+                       return false;
+               }
+
+               @Override
+               public boolean isDone() {
+                       return true;
+               }
+
+               @Override
+               public V get() throws InterruptedException, ExecutionException {
+                       if (exception != null) {
+                               throw new ExecutionException(exception);
+                       } else {
+                               return value;
+                       }
+               }
+
+               @Override
+               public V get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+                       return get();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index b09db1f..3202a9f 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -202,7 +202,6 @@ under the License.
                <dependency>
                        <groupId>org.reflections</groupId>
                        <artifactId>reflections</artifactId>
-                       <version>0.9.10</version>
                </dependency>
 
        </dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5b3148a..5d7cf91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -413,6 +413,13 @@ under the License.
                                <artifactId>jackson-annotations</artifactId>
                                <version>${jackson.version}</version>
                        </dependency>
+
+                       <dependency>
+                               <groupId>org.reflections</groupId>
+                               <artifactId>reflections</artifactId>
+                               <version>0.9.10</version>
+                               <scope>test</scope>
+                       </dependency>
                </dependencies>
        </dependencyManagement>
 

Reply via email to