[FLINK-4346] [rpc] Add new RPC abstraction

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04bcb715
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04bcb715
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04bcb715

Branch: refs/heads/flip-6
Commit: 04bcb71576f1f8261beb4c4cffb65b82a1382864
Parents: 68709b0
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Aug 3 19:31:34 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:12 2016 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |   5 +
 .../flink/runtime/rpc/MainThreadExecutor.java   |  54 +++
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 182 +++++++++++
 .../apache/flink/runtime/rpc/RpcGateway.java    |  25 ++
 .../org/apache/flink/runtime/rpc/RpcMethod.java |  35 ++
 .../apache/flink/runtime/rpc/RpcService.java    |  74 +++++
 .../apache/flink/runtime/rpc/RpcTimeout.java    |  34 ++
 .../flink/runtime/rpc/akka/AkkaGateway.java     |  29 ++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 145 ++++++++
 .../flink/runtime/rpc/akka/BaseAkkaActor.java   |  50 +++
 .../flink/runtime/rpc/akka/BaseAkkaGateway.java |  41 +++
 .../rpc/akka/jobmaster/JobMasterAkkaActor.java  |  58 ++++
 .../akka/jobmaster/JobMasterAkkaGateway.java    |  57 ++++
 .../rpc/akka/messages/CallableMessage.java      |  33 ++
 .../runtime/rpc/akka/messages/CancelTask.java   |  36 ++
 .../runtime/rpc/akka/messages/ExecuteTask.java  |  36 ++
 .../messages/RegisterAtResourceManager.java     |  36 ++
 .../rpc/akka/messages/RegisterJobMaster.java    |  36 ++
 .../runtime/rpc/akka/messages/RequestSlot.java  |  37 +++
 .../rpc/akka/messages/RunnableMessage.java      |  31 ++
 .../akka/messages/UpdateTaskExecutionState.java |  37 +++
 .../ResourceManagerAkkaActor.java               |  65 ++++
 .../ResourceManagerAkkaGateway.java             |  67 ++++
 .../taskexecutor/TaskExecutorAkkaActor.java     |  77 +++++
 .../taskexecutor/TaskExecutorAkkaGateway.java   |  59 ++++
 .../flink/runtime/rpc/jobmaster/JobMaster.java  | 249 ++++++++++++++
 .../runtime/rpc/jobmaster/JobMasterGateway.java |  45 +++
 .../resourcemanager/JobMasterRegistration.java  |  35 ++
 .../resourcemanager/RegistrationResponse.java   |  43 +++
 .../rpc/resourcemanager/ResourceManager.java    |  94 ++++++
 .../resourcemanager/ResourceManagerGateway.java |  58 ++++
 .../rpc/resourcemanager/SlotAssignment.java     |  25 ++
 .../rpc/resourcemanager/SlotRequest.java        |  25 ++
 .../runtime/rpc/taskexecutor/TaskExecutor.java  |  82 +++++
 .../rpc/taskexecutor/TaskExecutorGateway.java   |  48 +++
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 327 +++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  81 +++++
 .../rpc/taskexecutor/TaskExecutorTest.java      |  92 ++++++
 .../runtime/util/DirectExecutorService.java     | 234 +++++++++++++
 flink-tests/pom.xml                             |   1 -
 pom.xml                                         |   7 +
 41 files changed, 2784 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 5fea8fb..09c6fd0 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -189,6 +189,11 @@ under the License.
                        
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
                </dependency>
 
+               <dependency>
+                       <groupId>org.reflections</groupId>
+                       <artifactId>reflections</artifactId>
+               </dependency>
+
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
new file mode 100644
index 0000000..e06711e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.util.Timeout;
+import scala.concurrent.Future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Interface to execute {@link Runnable} and {@link Callable} in the main 
thread of the underlying
+ * rpc server.
+ *
+ * This interface is intended to be implemented by the self gateway in a 
{@link RpcEndpoint}
+ * implementation which allows to dispatch local procedures to the main thread 
of the underlying
+ * rpc server.
+ */
+public interface MainThreadExecutor {
+       /**
+        * Execute the runnable in the main thread of the underlying rpc server.
+        *
+        * @param runnable Runnable to be executed
+        */
+       void runAsync(Runnable runnable);
+
+       /**
+        * Execute the callable in the main thread of the underlying rpc server 
and return a future for
+        * the callable result. If the future is not completed within the given 
timeout, the returned
+        * future will throw a {@link TimeoutException}.
+        *
+        * @param callable Callable to be executed
+        * @param timeout Timeout for the future to complete
+        * @param <V> Return value of the callable
+        * @return Future of the callable result
+        */
+       <V> Future<V> callAsync(Callable<V> callable, Timeout timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
new file mode 100644
index 0000000..3d8757f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.util.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Base class for rpc endpoints. Distributed components which offer remote 
procedure calls have to
+ * extend the rpc endpoint base class.
+ *
+ * The main idea is that a rpc endpoint is backed by a rpc server which has a 
single thread
+ * processing the rpc calls. Thus, by executing all state changing operations 
within the main
+ * thread, we don't have to reason about concurrent accesses. The rpc provides 
provides
+ * {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} and the
+ * {@link #getMainThreadExecutionContext()} to execute code in the rpc 
server's main thread.
+ *
+ * @param <C> Rpc gateway counterpart for the implementing rpc endpoint
+ */
+public abstract class RpcEndpoint<C extends RpcGateway> {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       /** Rpc service to be used to start the rpc server and to obtain rpc 
gateways */
+       private final RpcService rpcService;
+
+       /** Self gateway which can be used to schedule asynchronous calls on 
yourself */
+       private C self;
+
+       /**
+        * The main thread execution context to be used to execute future 
callbacks in the main thread
+        * of the executing rpc server.
+        *
+        * IMPORTANT: The main thread context is only available after the rpc 
server has been started.
+        */
+       private MainThreadExecutionContext mainThreadExecutionContext;
+
+       public RpcEndpoint(RpcService rpcService) {
+               this.rpcService = rpcService;
+       }
+
+       /**
+        * Get self-gateway which should be used to run asynchronous rpc calls 
on this endpoint.
+        *
+        * IMPORTANT: Always issue local method calls via the self-gateway if 
the current thread
+        * is not the main thread of the underlying rpc server, e.g. from 
within a future callback.
+        *
+        * @return Self gateway
+        */
+       public C getSelf() {
+               return self;
+       }
+
+       /**
+        * Execute the runnable in the main thread of the underlying rpc server.
+        *
+        * @param runnable Runnable to be executed in the main thread of the 
underlying rpc server
+        */
+       public void runAsync(Runnable runnable) {
+               ((MainThreadExecutor) self).runAsync(runnable);
+       }
+
+       /**
+        * Execute the callable in the main thread of the underlying rpc server 
returning a future for
+        * the result of the callable. If the callable is not completed within 
the given timeout, then
+        * the future will be failed with a {@link 
java.util.concurrent.TimeoutException}.
+        *
+        * @param callable Callable to be executed in the main thread of the 
underlying rpc server
+        * @param timeout Timeout for the callable to be completed
+        * @param <V> Return type of the callable
+        * @return Future for the result of the callable.
+        */
+       public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+               return ((MainThreadExecutor) self).callAsync(callable, timeout);
+       }
+
+       /**
+        * Gets the main thread execution context. The main thread execution 
context can be used to
+        * execute tasks in the main thread of the underlying rpc server.
+        *
+        * @return Main thread execution context
+        */
+       public ExecutionContext getMainThreadExecutionContext() {
+               return mainThreadExecutionContext;
+       }
+
+       /**
+        * Gets the used rpc service.
+        *
+        * @return Rpc service
+        */
+       public RpcService getRpcService() {
+               return rpcService;
+       }
+
+       /**
+        * Starts the underlying rpc server via the rpc service and creates the 
main thread execution
+        * context. This makes the rpc endpoint effectively reachable from the 
outside.
+        *
+        * Can be overriden to add rpc endpoint specific start up code. Should 
always call the parent
+        * start method.
+        */
+       public void start() {
+               self = rpcService.startServer(this);
+               mainThreadExecutionContext = new 
MainThreadExecutionContext((MainThreadExecutor) self);
+       }
+
+
+       /**
+        * Shuts down the underlying rpc server via the rpc service.
+        *
+        * Can be overriden to add rpc endpoint specific shut down code. Should 
always call the parent
+        * shut down method.
+        */
+       public void shutDown() {
+               rpcService.stopServer(self);
+       }
+
+       /**
+        * Gets the address of the underlying rpc server. The address should be 
fully qualified so that
+        * a remote system can connect to this rpc server via this address.
+        *
+        * @return Fully qualified address of the underlying rpc server
+        */
+       public String getAddress() {
+               return rpcService.getAddress(self);
+       }
+
+       /**
+        * Execution context which executes runnables in the main thread 
context. A reported failure
+        * will cause the underlying rpc server to shut down.
+        */
+       private class MainThreadExecutionContext implements ExecutionContext {
+               private final MainThreadExecutor gateway;
+
+               MainThreadExecutionContext(MainThreadExecutor gateway) {
+                       this.gateway = gateway;
+               }
+
+               @Override
+               public void execute(Runnable runnable) {
+                       gateway.runAsync(runnable);
+               }
+
+               @Override
+               public void reportFailure(final Throwable t) {
+                       gateway.runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.error("Encountered failure in the 
main thread execution context.", t);
+                                       shutDown();
+                               }
+                       });
+               }
+
+               @Override
+               public ExecutionContext prepare() {
+                       return this;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
new file mode 100644
index 0000000..e3a16b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+/**
+ * Rpc gateway interface which has to be implemented by Rpc gateways.
+ */
+public interface RpcGateway {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
new file mode 100644
index 0000000..875e557
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for rpc method in a {@link RpcEndpoint} implementation. Every 
rpc method must have a
+ * respective counterpart in the {@link RpcGateway} implementation for this 
rpc server. The
+ * RpcCompletenessTest makes sure that the set of rpc methods in a rpc server 
and the set of
+ * gateway methods in the corresponding gateway implementation are identical.
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RpcMethod {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
new file mode 100644
index 0000000..90ff7b6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import scala.concurrent.Future;
+
+/**
+ * Interface for rpc services. An rpc service is used to start and connect to 
a {@link RpcEndpoint}.
+ * Connecting to a rpc server will return a {@link RpcGateway} which can be 
used to call remote
+ * procedures.
+ */
+public interface RpcService {
+
+       /**
+        * Connect to a remote rpc server under the provided address. Returns a 
rpc gateway which can
+        * be used to communicate with the rpc server.
+        *
+        * @param address Address of the remote rpc server
+        * @param clazz Class of the rpc gateway to return
+        * @param <C> Type of the rpc gateway to return
+        * @return Future containing the rpc gateway
+        */
+       <C extends RpcGateway> Future<C> connect(String address, Class<C> 
clazz);
+
+       /**
+        * Start a rpc server which forwards the remote procedure calls to the 
provided rpc endpoint.
+        *
+        * @param rpcEndpoint Rpc protocl to dispath the rpcs to
+        * @param <S> Type of the rpc endpoint
+        * @param <C> Type of the self rpc gateway associated with the rpc 
server
+        * @return Self gateway to dispatch remote procedure calls to oneself
+        */
+       <S extends RpcEndpoint, C extends RpcGateway> C startServer(S 
rpcEndpoint);
+
+       /**
+        * Stop the underlying rpc server of the provided self gateway.
+        *
+        * @param selfGateway Self gateway describing the underlying rpc server
+        * @param <C> Type of the rpc gateway
+        */
+       <C extends RpcGateway> void stopServer(C selfGateway);
+
+       /**
+        * Stop the rpc service shutting down all started rpc servers.
+        */
+       void stopService();
+
+       /**
+        * Get the fully qualified address of the underlying rpc server 
represented by the self gateway.
+        * It must be possible to connect from a remote host to the rpc server 
via the returned fully
+        * qualified address.
+        *
+        * @param selfGateway Self gateway associated with the underlying rpc 
server
+        * @param <C> Type of the rpc gateway
+        * @return Fully qualified address
+        */
+       <C extends RpcGateway> String getAddress(C selfGateway);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
new file mode 100644
index 0000000..3d36d47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for {@link RpcGateway} methods to specify an additional timeout 
parameter for the
+ * returned future to be completed. The rest of the provided parameters is 
passed to the remote rpc
+ * server for the rpc.
+ */
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RpcTimeout {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
new file mode 100644
index 0000000..a96a600
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+
+/**
+ * Interface for Akka based rpc gateways
+ */
+public interface AkkaGateway {
+
+       ActorRef getActorRef();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
new file mode 100644
index 0000000..d55bd13
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorIdentity;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Identify;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Mapper;
+import akka.pattern.AskableActorSelection;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaActor;
+import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaGateway;
+import 
org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaActor;
+import 
org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaActor;
+import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaGateway;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
+import scala.concurrent.Future;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class AkkaRpcService implements RpcService {
+       private final ActorSystem actorSystem;
+       private final Timeout timeout;
+       private final Set<ActorRef> actors = new HashSet<>();
+
+       public AkkaRpcService(ActorSystem actorSystem, Timeout timeout) {
+               this.actorSystem = actorSystem;
+               this.timeout = timeout;
+       }
+
+       @Override
+       public <C extends RpcGateway> Future<C> connect(String address, final 
Class<C> clazz) {
+               ActorSelection actorSel = actorSystem.actorSelection(address);
+
+               AskableActorSelection asker = new 
AskableActorSelection(actorSel);
+
+               Future<Object> identify = asker.ask(new Identify(42), timeout);
+
+               return identify.map(new Mapper<Object, C>(){
+                       public C apply(Object obj) {
+                               ActorRef actorRef = ((ActorIdentity) 
obj).getRef();
+
+                               if (clazz == TaskExecutorGateway.class) {
+                                       return (C) new 
TaskExecutorAkkaGateway(actorRef, timeout);
+                               } else if (clazz == 
ResourceManagerGateway.class) {
+                                       return (C) new 
ResourceManagerAkkaGateway(actorRef, timeout);
+                               } else if (clazz == JobMasterGateway.class) {
+                                       return (C) new 
JobMasterAkkaGateway(actorRef, timeout);
+                               } else {
+                                       throw new RuntimeException("Could not 
find remote endpoint " + clazz);
+                               }
+                       }
+               }, actorSystem.dispatcher());
+       }
+
+       @Override
+       public <S extends RpcEndpoint, C extends RpcGateway> C startServer(S 
rpcEndpoint) {
+               ActorRef ref;
+               C self;
+               if (rpcEndpoint instanceof TaskExecutor) {
+                       ref = actorSystem.actorOf(
+                               Props.create(TaskExecutorAkkaActor.class, 
rpcEndpoint)
+                       );
+
+                       self = (C) new TaskExecutorAkkaGateway(ref, timeout);
+               } else if (rpcEndpoint instanceof ResourceManager) {
+                       ref = actorSystem.actorOf(
+                               Props.create(ResourceManagerAkkaActor.class, 
rpcEndpoint)
+                       );
+
+                       self = (C) new ResourceManagerAkkaGateway(ref, timeout);
+               } else if (rpcEndpoint instanceof JobMaster) {
+                       ref = actorSystem.actorOf(
+                               Props.create(JobMasterAkkaActor.class, 
rpcEndpoint)
+                       );
+
+                       self = (C) new JobMasterAkkaGateway(ref, timeout);
+               } else {
+                       throw new RuntimeException("Could not start RPC server 
for class " + rpcEndpoint.getClass());
+               }
+
+               actors.add(ref);
+
+               return self;
+       }
+
+       @Override
+       public <C extends RpcGateway> void stopServer(C selfGateway) {
+               if (selfGateway instanceof AkkaGateway) {
+                       AkkaGateway akkaClient = (AkkaGateway) selfGateway;
+
+                       if (actors.contains(akkaClient.getActorRef())) {
+                               
akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
+                       } else {
+                               // don't stop this actor since it was not 
started by this RPC service
+                       }
+               }
+       }
+
+       @Override
+       public void stopService() {
+               actorSystem.shutdown();
+               actorSystem.awaitTermination();
+       }
+
+       @Override
+       public <C extends RpcGateway> String getAddress(C selfGateway) {
+               if (selfGateway instanceof AkkaGateway) {
+                       return AkkaUtils.getAkkaURL(actorSystem, ((AkkaGateway) 
selfGateway).getActorRef());
+               } else {
+                       throw new RuntimeException("Cannot get address for non 
" + AkkaGateway.class.getName() + ".");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
new file mode 100644
index 0000000..3cb499c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
+import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseAkkaActor extends UntypedActor {
+       private static final Logger LOG = 
LoggerFactory.getLogger(BaseAkkaActor.class);
+
+       @Override
+       public void onReceive(Object message) throws Exception {
+               if (message instanceof RunnableMessage) {
+                       try {
+                               ((RunnableMessage) message).getRunnable().run();
+                       } catch (Exception e) {
+                               LOG.error("Encountered error while executing 
runnable.", e);
+                       }
+               } else if (message instanceof CallableMessage<?>) {
+                       try {
+                               Object result = ((CallableMessage<?>) 
message).getCallable().call();
+                               sender().tell(new Status.Success(result), 
getSelf());
+                       } catch (Exception e) {
+                               sender().tell(new Status.Failure(e), getSelf());
+                       }
+               } else {
+                       throw new RuntimeException("Unknown message " + 
message);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
new file mode 100644
index 0000000..512790d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
+import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
+import scala.concurrent.Future;
+
+import java.util.concurrent.Callable;
+
+public abstract class BaseAkkaGateway implements MainThreadExecutor, 
AkkaGateway {
+       @Override
+       public void runAsync(Runnable runnable) {
+               getActorRef().tell(new RunnableMessage(runnable), 
ActorRef.noSender());
+       }
+
+       @Override
+       public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+               return (Future<V>) Patterns.ask(getActorRef(), new 
CallableMessage(callable), timeout);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
new file mode 100644
index 0000000..9e04ea9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.jobmaster;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
+
+public class JobMasterAkkaActor extends BaseAkkaActor {
+       private final JobMaster jobMaster;
+
+       public JobMasterAkkaActor(JobMaster jobMaster) {
+               this.jobMaster = jobMaster;
+       }
+
+       @Override
+       public void onReceive(Object message) throws Exception {
+               if (message instanceof UpdateTaskExecutionState) {
+
+                       final ActorRef sender = getSender();
+
+                       UpdateTaskExecutionState updateTaskExecutionState = 
(UpdateTaskExecutionState) message;
+
+                       try {
+                               Acknowledge result = 
jobMaster.updateTaskExecutionState(updateTaskExecutionState.getTaskExecutionState());
+                               sender.tell(new Status.Success(result), 
getSelf());
+                       } catch (Exception e) {
+                               sender.tell(new Status.Failure(e), getSelf());
+                       }
+               } else if (message instanceof RegisterAtResourceManager) {
+                       RegisterAtResourceManager registerAtResourceManager = 
(RegisterAtResourceManager) message;
+
+                       
jobMaster.registerAtResourceManager(registerAtResourceManager.getAddress());
+               } else {
+                       super.onReceive(message);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
new file mode 100644
index 0000000..e6bf061
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.jobmaster;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.concurrent.Future;
+import scala.reflect.ClassTag$;
+
+public class JobMasterAkkaGateway extends BaseAkkaGateway implements 
JobMasterGateway {
+       private final AskableActorRef actorRef;
+       private final Timeout timeout;
+
+       public JobMasterAkkaGateway(ActorRef actorRef, Timeout timeout) {
+               this.actorRef = new AskableActorRef(actorRef);
+               this.timeout = timeout;
+       }
+
+       @Override
+       public Future<Acknowledge> updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+               return actorRef.ask(new 
UpdateTaskExecutionState(taskExecutionState), timeout)
+                       
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+       }
+
+       @Override
+       public void registerAtResourceManager(String address) {
+               actorRef.actorRef().tell(new 
RegisterAtResourceManager(address), actorRef.actorRef());
+       }
+
+       @Override
+       public ActorRef getActorRef() {
+               return actorRef.actorRef();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
new file mode 100644
index 0000000..f0e555f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import java.util.concurrent.Callable;
+
+public class CallableMessage<V> {
+       private final Callable<V> callable;
+
+       public CallableMessage(Callable<V> callable) {
+               this.callable = callable;
+       }
+
+       public Callable<V> getCallable() {
+               return callable;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
new file mode 100644
index 0000000..0b9e9dc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.io.Serializable;
+
+public class CancelTask implements Serializable {
+       private static final long serialVersionUID = -2998176874447950595L;
+       private final ExecutionAttemptID executionAttemptID;
+
+       public CancelTask(ExecutionAttemptID executionAttemptID) {
+               this.executionAttemptID = executionAttemptID;
+       }
+
+       public ExecutionAttemptID getExecutionAttemptID() {
+               return executionAttemptID;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
new file mode 100644
index 0000000..a83d539
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+
+import java.io.Serializable;
+
+public class ExecuteTask implements Serializable {
+       private static final long serialVersionUID = -6769958430967048348L;
+       private final TaskDeploymentDescriptor taskDeploymentDescriptor;
+
+       public ExecuteTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
+               this.taskDeploymentDescriptor = taskDeploymentDescriptor;
+       }
+
+       public TaskDeploymentDescriptor getTaskDeploymentDescriptor() {
+               return taskDeploymentDescriptor;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
new file mode 100644
index 0000000..3ade082
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import java.io.Serializable;
+
+public class RegisterAtResourceManager implements Serializable {
+
+       private static final long serialVersionUID = -4175905742620903602L;
+
+       private final String address;
+
+       public RegisterAtResourceManager(String address) {
+               this.address = address;
+       }
+
+       public String getAddress() {
+               return address;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
new file mode 100644
index 0000000..b35ea38
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+
+import java.io.Serializable;
+
+public class RegisterJobMaster implements Serializable{
+       private static final long serialVersionUID = -4616879574192641507L;
+       private final JobMasterRegistration jobMasterRegistration;
+
+       public RegisterJobMaster(JobMasterRegistration jobMasterRegistration) {
+               this.jobMasterRegistration = jobMasterRegistration;
+       }
+
+       public JobMasterRegistration getJobMasterRegistration() {
+               return jobMasterRegistration;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
new file mode 100644
index 0000000..85ceeec
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+
+import java.io.Serializable;
+
+public class RequestSlot implements Serializable {
+       private static final long serialVersionUID = 7207463889348525866L;
+
+       private final SlotRequest slotRequest;
+
+       public RequestSlot(SlotRequest slotRequest) {
+               this.slotRequest = slotRequest;
+       }
+
+       public SlotRequest getSlotRequest() {
+               return slotRequest;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
new file mode 100644
index 0000000..3556738
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+public class RunnableMessage {
+       private final Runnable runnable;
+
+       public RunnableMessage(Runnable runnable) {
+               this.runnable = runnable;
+       }
+
+       public Runnable getRunnable() {
+               return runnable;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
new file mode 100644
index 0000000..f89cd2f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import java.io.Serializable;
+
+public class UpdateTaskExecutionState implements Serializable{
+       private static final long serialVersionUID = -6662229114427331436L;
+
+       private final TaskExecutionState taskExecutionState;
+
+       public UpdateTaskExecutionState(TaskExecutionState taskExecutionState) {
+               this.taskExecutionState = taskExecutionState;
+       }
+
+       public TaskExecutionState getTaskExecutionState() {
+               return taskExecutionState;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
new file mode 100644
index 0000000..13101f9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.resourcemanager;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.pattern.Patterns;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
+import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
+import scala.concurrent.Future;
+
+public class ResourceManagerAkkaActor extends BaseAkkaActor {
+       private final ResourceManager resourceManager;
+
+       public ResourceManagerAkkaActor(ResourceManager resourceManager) {
+               this.resourceManager = resourceManager;
+       }
+
+       @Override
+       public void onReceive(Object message) throws Exception {
+               final ActorRef sender = getSender();
+
+               if (message instanceof RegisterJobMaster) {
+                       RegisterJobMaster registerJobMaster = 
(RegisterJobMaster) message;
+
+                       try {
+                               Future<RegistrationResponse> response = 
resourceManager.registerJobMaster(registerJobMaster.getJobMasterRegistration());
+                               Patterns.pipe(response, 
getContext().dispatcher()).to(sender());
+                       } catch (Exception e) {
+                               sender.tell(new Status.Failure(e), getSelf());
+                       }
+               } else if (message instanceof RequestSlot) {
+                       RequestSlot requestSlot = (RequestSlot) message;
+
+                       try {
+                               SlotAssignment response = 
resourceManager.requestSlot(requestSlot.getSlotRequest());
+                               sender.tell(new Status.Success(response), 
getSelf());
+                       } catch (Exception e) {
+                               sender.tell(new Status.Failure(e), getSelf());
+                       }
+               } else {
+                       super.onReceive(message);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
new file mode 100644
index 0000000..1304707
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.resourcemanager;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
+import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+public class ResourceManagerAkkaGateway extends BaseAkkaGateway implements 
ResourceManagerGateway {
+       private final AskableActorRef actorRef;
+       private final Timeout timeout;
+
+       public ResourceManagerAkkaGateway(ActorRef actorRef, Timeout timeout) {
+               this.actorRef = new AskableActorRef(actorRef);
+               this.timeout = timeout;
+       }
+
+       @Override
+       public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration, FiniteDuration 
timeout) {
+               return actorRef.ask(new 
RegisterJobMaster(jobMasterRegistration), new Timeout(timeout))
+                       
.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
+       }
+
+       @Override
+       public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+               return actorRef.ask(new 
RegisterJobMaster(jobMasterRegistration), timeout)
+                       
.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
+       }
+
+       @Override
+       public Future<SlotAssignment> requestSlot(SlotRequest slotRequest) {
+               return actorRef.ask(new RequestSlot(slotRequest), timeout)
+                       
.mapTo(ClassTag$.MODULE$.<SlotAssignment>apply(SlotAssignment.class));
+       }
+
+       @Override
+       public ActorRef getActorRef() {
+               return actorRef.actorRef();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
new file mode 100644
index 0000000..ed522cc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.taskexecutor;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.dispatch.OnComplete;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
+import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
+import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+
+public class TaskExecutorAkkaActor extends BaseAkkaActor {
+       private final TaskExecutorGateway taskExecutor;
+
+       public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) {
+               this.taskExecutor = taskExecutor;
+       }
+
+       @Override
+       public void onReceive(Object message) throws Exception {
+               final ActorRef sender = getSender();
+
+               if (message instanceof ExecuteTask) {
+                       ExecuteTask executeTask = (ExecuteTask) message;
+
+                       
taskExecutor.executeTask(executeTask.getTaskDeploymentDescriptor()).onComplete(
+                               new OnComplete<Acknowledge>() {
+                                       @Override
+                                       public void onComplete(Throwable 
failure, Acknowledge success) throws Throwable {
+                                               if (failure != null) {
+                                                       sender.tell(new 
Status.Failure(failure), getSelf());
+                                               } else {
+                                                       sender.tell(new 
Status.Success(Acknowledge.get()), getSelf());
+                                               }
+                                       }
+                               },
+                               getContext().dispatcher()
+                       );
+               } else if (message instanceof CancelTask) {
+                       CancelTask cancelTask = (CancelTask) message;
+
+                       
taskExecutor.cancelTask(cancelTask.getExecutionAttemptID()).onComplete(
+                               new OnComplete<Acknowledge>() {
+                                       @Override
+                                       public void onComplete(Throwable 
failure, Acknowledge success) throws Throwable {
+                                               if (failure != null) {
+                                                       sender.tell(new 
Status.Failure(failure), getSelf());
+                                               } else {
+                                                       sender.tell(new 
Status.Success(Acknowledge.get()), getSelf());
+                                               }
+                                       }
+                               },
+                               getContext().dispatcher()
+                       );
+               } else {
+                       super.onReceive(message);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
new file mode 100644
index 0000000..7f0a522
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.taskexecutor;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
+import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+import scala.concurrent.Future;
+import scala.reflect.ClassTag$;
+
+public class TaskExecutorAkkaGateway extends BaseAkkaGateway implements 
TaskExecutorGateway {
+       private final AskableActorRef actorRef;
+       private final Timeout timeout;
+
+       public TaskExecutorAkkaGateway(ActorRef actorRef, Timeout timeout) {
+               this.actorRef = new AskableActorRef(actorRef);
+               this.timeout = timeout;
+       }
+
+       @Override
+       public Future<Acknowledge> executeTask(TaskDeploymentDescriptor 
taskDeploymentDescriptor) {
+               return actorRef.ask(new ExecuteTask(taskDeploymentDescriptor), 
timeout)
+                       
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+       }
+
+       @Override
+       public Future<Acknowledge> cancelTask(ExecutionAttemptID 
executionAttemptId) {
+               return actorRef.ask(new CancelTask(executionAttemptId), timeout)
+                       
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+       }
+
+       @Override
+       public ActorRef getActorRef() {
+               return actorRef.actorRef();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
new file mode 100644
index 0000000..b81b19c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.jobmaster;
+
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.Tuple2;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * JobMaster implementation. The job master is responsible for the execution 
of a single
+ * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
+ *
+ * It offers the following methods as part of its rpc interface to interact 
with the JobMaster
+ * remotely:
+ * <ul>
+ *     <li>{@link #registerAtResourceManager(String)} triggers the 
registration at the resource manager</li>
+ *     <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the 
task execution state for
+ * given task</li>
+ * </ul>
+ */
+public class JobMaster extends RpcEndpoint<JobMasterGateway> {
+       /** Execution context for future callbacks */
+       private final ExecutionContext executionContext;
+
+       /** Execution context for scheduled runnables */
+       private final ScheduledExecutorService scheduledExecutorService;
+
+       private final FiniteDuration initialRegistrationTimeout = new 
FiniteDuration(500, TimeUnit.MILLISECONDS);
+       private final FiniteDuration maxRegistrationTimeout = new 
FiniteDuration(30, TimeUnit.SECONDS);
+       private final FiniteDuration registrationDuration = new 
FiniteDuration(365, TimeUnit.DAYS);
+       private final long failedRegistrationDelay = 10000;
+
+       /** Gateway to connected resource manager, null iff not connected */
+       private ResourceManagerGateway resourceManager = null;
+
+       /** UUID to filter out old registration runs */
+       private UUID currentRegistrationRun;
+
+       public JobMaster(RpcService rpcService, ExecutorService 
executorService) {
+               super(rpcService);
+               executionContext = 
ExecutionContext$.MODULE$.fromExecutor(executorService);
+               scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+       }
+
+       public ResourceManagerGateway getResourceManager() {
+               return resourceManager;
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // RPC methods
+       
//----------------------------------------------------------------------------------------------
+
+       /**
+        * Updates the task execution state for a given task.
+        *
+        * @param taskExecutionState New task execution state for a given task
+        * @return Acknowledge the task execution state update
+        */
+       @RpcMethod
+       public Acknowledge updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+               System.out.println("TaskExecutionState: " + taskExecutionState);
+               return Acknowledge.get();
+       }
+
+       /**
+        * Triggers the registration of the job master at the resource manager.
+        *
+        * @param address Address of the resource manager
+        */
+       @RpcMethod
+       public void registerAtResourceManager(final String address) {
+               currentRegistrationRun = UUID.randomUUID();
+
+               Future<ResourceManagerGateway> resourceManagerFuture = 
getRpcService().connect(address, ResourceManagerGateway.class);
+
+               handleResourceManagerRegistration(
+                       new JobMasterRegistration(getAddress()),
+                       1,
+                       resourceManagerFuture,
+                       currentRegistrationRun,
+                       initialRegistrationTimeout,
+                       maxRegistrationTimeout,
+                       registrationDuration.fromNow());
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // Helper methods
+       
//----------------------------------------------------------------------------------------------
+
+       /**
+        * Helper method to handle the resource manager registration process. 
If a registration attempt
+        * times out, then a new attempt with the doubled time out is 
initiated. The whole registration
+        * process has a deadline. Once this deadline is overdue without 
successful registration, the
+        * job master shuts down.
+        *
+        * @param jobMasterRegistration Job master registration info which is 
sent to the resource
+        *                              manager
+        * @param attemptNumber Registration attempt number
+        * @param resourceManagerFuture Future of the resource manager gateway
+        * @param registrationRun UUID describing the current registration run
+        * @param timeout Timeout of the last registration attempt
+        * @param maxTimeout Maximum timeout between registration attempts
+        * @param deadline Deadline for the registration
+        */
+       void handleResourceManagerRegistration(
+               final JobMasterRegistration jobMasterRegistration,
+               final int attemptNumber,
+               final Future<ResourceManagerGateway> resourceManagerFuture,
+               final UUID registrationRun,
+               final FiniteDuration timeout,
+               final FiniteDuration maxTimeout,
+               final Deadline deadline) {
+
+               // filter out concurrent registration runs
+               if (registrationRun.equals(currentRegistrationRun)) {
+
+                       log.info("Start registration attempt #{}.", 
attemptNumber);
+
+                       if (deadline.isOverdue()) {
+                               // we've exceeded our registration deadline. 
This means that we have to shutdown the JobMaster
+                               log.error("Exceeded registration deadline 
without successfully registering at the ResourceManager.");
+                               shutDown();
+                       } else {
+                               Future<Tuple2<RegistrationResponse, 
ResourceManagerGateway>> registrationResponseFuture = 
resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, 
Future<Tuple2<RegistrationResponse, ResourceManagerGateway>>>() {
+                                       @Override
+                                       public 
Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> 
apply(ResourceManagerGateway resourceManagerGateway) {
+                                               return 
resourceManagerGateway.registerJobMaster(jobMasterRegistration, 
timeout).zip(Futures.successful(resourceManagerGateway));
+                                       }
+                               }, executionContext);
+
+                               registrationResponseFuture.onComplete(new 
OnComplete<Tuple2<RegistrationResponse, ResourceManagerGateway>>() {
+                                       @Override
+                                       public void onComplete(Throwable 
failure, Tuple2<RegistrationResponse, ResourceManagerGateway> tuple) throws 
Throwable {
+                                               if (failure != null) {
+                                                       if (failure instanceof 
TimeoutException) {
+                                                               // we haven't 
received an answer in the given timeout interval,
+                                                               // so increase 
it and try again.
+                                                               final 
FiniteDuration newTimeout = timeout.$times(2L).min(maxTimeout);
+
+                                                               
handleResourceManagerRegistration(
+                                                                       
jobMasterRegistration,
+                                                                       
attemptNumber + 1,
+                                                                       
resourceManagerFuture,
+                                                                       
registrationRun,
+                                                                       
newTimeout,
+                                                                       
maxTimeout,
+                                                                       
deadline);
+                                                       } else {
+                                                               
log.error("Received unknown error while registering at the ResourceManager.", 
failure);
+                                                               shutDown();
+                                                       }
+                                               } else {
+                                                       final 
RegistrationResponse response = tuple._1();
+                                                       final 
ResourceManagerGateway gateway = tuple._2();
+
+                                                       if 
(response.isSuccess()) {
+                                                               
finishResourceManagerRegistration(gateway, response.getInstanceID());
+                                                       } else {
+                                                               log.info("The 
registration was refused. Try again.");
+
+                                                               
scheduledExecutorService.schedule(new Runnable() {
+                                                                       
@Override
+                                                                       public 
void run() {
+                                                                               
// we have to execute scheduled runnable in the main thread
+                                                                               
// because we need consistency wrt currentRegistrationRun
+                                                                               
runAsync(new Runnable() {
+                                                                               
        @Override
+                                                                               
        public void run() {
+                                                                               
                // our registration attempt was refused. Start over.
+                                                                               
                handleResourceManagerRegistration(
+                                                                               
                        jobMasterRegistration,
+                                                                               
                        1,
+                                                                               
                        resourceManagerFuture,
+                                                                               
                        registrationRun,
+                                                                               
                        initialRegistrationTimeout,
+                                                                               
                        maxTimeout,
+                                                                               
                        deadline);
+                                                                               
        }
+                                                                               
});
+                                                                       }
+                                                               }, 
failedRegistrationDelay, TimeUnit.MILLISECONDS);
+                                                       }
+                                               }
+                                       }
+                               }, getMainThreadExecutionContext()); // use the 
main thread execution context to execute the call back in the main thread
+                       }
+               } else {
+                       log.info("Discard out-dated registration run.");
+               }
+       }
+
+       /**
+        * Finish the resource manager registration by setting the new resource 
manager gateway.
+        *
+        * @param resourceManager New resource manager gateway
+        * @param instanceID Instance id assigned by the resource manager
+        */
+       void finishResourceManagerRegistration(ResourceManagerGateway 
resourceManager, InstanceID instanceID) {
+               log.info("Successfully registered at the ResourceManager under 
instance id {}.", instanceID);
+               this.resourceManager = resourceManager;
+       }
+
+       /**
+        * Return if the job master is connected to a resource manager.
+        *
+        * @return true if the job master is connected to the resource manager
+        */
+       public boolean isConnected() {
+               return resourceManager != null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
new file mode 100644
index 0000000..17a4c3a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.jobmaster;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.concurrent.Future;
+
+/**
+ * {@link JobMaster} rpc gateway interface
+ */
+public interface JobMasterGateway extends RpcGateway {
+
+       /**
+        * Updates the task execution state for a given task.
+        *
+        * @param taskExecutionState New task execution state for a given task
+        * @return Future acknowledge of the task execution state update
+        */
+       Future<Acknowledge> updateTaskExecutionState(TaskExecutionState 
taskExecutionState);
+
+       /**
+        * Triggers the registration of the job master at the resource manager.
+        *
+        * @param address Address of the resource manager
+        */
+       void registerAtResourceManager(final String address);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
new file mode 100644
index 0000000..7a2deae
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class JobMasterRegistration implements Serializable {
+       private static final long serialVersionUID = 8411214999193765202L;
+
+       private final String address;
+
+       public JobMasterRegistration(String address) {
+               this.address = address;
+       }
+
+       public String getAddress() {
+               return address;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
new file mode 100644
index 0000000..8ac9e49
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.io.Serializable;
+
+public class RegistrationResponse implements Serializable {
+       private static final long serialVersionUID = -2379003255993119993L;
+
+       private final boolean isSuccess;
+       private final InstanceID instanceID;
+
+       public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
+               this.isSuccess = isSuccess;
+               this.instanceID = instanceID;
+       }
+
+       public boolean isSuccess() {
+               return isSuccess;
+       }
+
+       public InstanceID getInstanceID() {
+               return instanceID;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
new file mode 100644
index 0000000..c7e8def
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import akka.dispatch.Mapper;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * ResourceManager implementation. The resource manager is responsible for 
resource de-/allocation
+ * and bookkeeping.
+ *
+ * It offers the following methods as part of its rpc interface to interact 
with the him remotely:
+ * <ul>
+ *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a 
{@link JobMaster} at the resource manager</li>
+ *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource 
manager</li>
+ * </ul>
+ */
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
+       private final ExecutionContext executionContext;
+       private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+
+       public ResourceManager(RpcService rpcService, ExecutorService 
executorService) {
+               super(rpcService);
+               this.executionContext = 
ExecutionContext$.MODULE$.fromExecutor(executorService);
+               this.jobMasterGateways = new HashMap<>();
+       }
+
+       /**
+        * Register a {@link JobMaster} at the resource manager.
+        *
+        * @param jobMasterRegistration Job master registration information
+        * @return Future registration response
+        */
+       @RpcMethod
+       public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+               Future<JobMasterGateway> jobMasterFuture = 
getRpcService().connect(jobMasterRegistration.getAddress(), 
JobMasterGateway.class);
+
+               return jobMasterFuture.map(new Mapper<JobMasterGateway, 
RegistrationResponse>() {
+                       @Override
+                       public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
+                               InstanceID instanceID;
+
+                               if 
(jobMasterGateways.containsKey(jobMasterGateway)) {
+                                       instanceID = 
jobMasterGateways.get(jobMasterGateway);
+                               } else {
+                                       instanceID = new InstanceID();
+                                       jobMasterGateways.put(jobMasterGateway, 
instanceID);
+                               }
+
+                               return new RegistrationResponse(true, 
instanceID);
+                       }
+               }, getMainThreadExecutionContext());
+       }
+
+       /**
+        * Requests a slot from the resource manager.
+        *
+        * @param slotRequest Slot request
+        * @return Slot assignment
+        */
+       @RpcMethod
+       public SlotAssignment requestSlot(SlotRequest slotRequest) {
+               System.out.println("SlotRequest: " + slotRequest);
+               return new SlotAssignment();
+       }
+}

Reply via email to