Repository: flink
Updated Branches:
  refs/heads/master d7cea586e -> 1804aa33d


http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
new file mode 100644
index 0000000..2481065
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Local {@link FencedMessage} implementation. This message is used when the 
communication
+ * is local and thus does not require its payload to be serializable.
+ *
+ * @param <F> type of the fencing token
+ * @param <P> type of the payload
+ */
+public class LocalFencedMessage<F extends Serializable, P> implements 
FencedMessage<F, P> {
+
+       private final F fencingToken;
+       private final P payload;
+
+       public LocalFencedMessage(F fencingToken, P payload) {
+               this.fencingToken = Preconditions.checkNotNull(fencingToken);
+               this.payload = Preconditions.checkNotNull(payload);
+       }
+
+       @Override
+       public F getFencingToken() {
+               return fencingToken;
+       }
+
+       @Override
+       public P getPayload() {
+               return payload;
+       }
+
+       @Override
+       public String toString() {
+               return "LocalFencedMessage(" + fencingToken + ", " + payload + 
')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalRpcInvocation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalRpcInvocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalRpcInvocation.java
new file mode 100644
index 0000000..0bd06c3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalRpcInvocation.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Local rpc invocation message containing the remote procedure name, its 
parameter types and the
+ * corresponding call arguments. This message will only be sent if the 
communication is local and,
+ * thus, the message does not have to be serialized.
+ */
+public final class LocalRpcInvocation implements RpcInvocation {
+
+       private final String methodName;
+       private final Class<?>[] parameterTypes;
+       private final Object[] args;
+
+       private transient String toString;
+
+       public LocalRpcInvocation(String methodName, Class<?>[] parameterTypes, 
Object[] args) {
+               this.methodName = Preconditions.checkNotNull(methodName);
+               this.parameterTypes = 
Preconditions.checkNotNull(parameterTypes);
+               this.args = args;
+
+               toString = null;
+       }
+
+       @Override
+       public String getMethodName() {
+               return methodName;
+       }
+
+       @Override
+       public Class<?>[] getParameterTypes() {
+               return parameterTypes;
+       }
+
+       @Override
+       public Object[] getArgs() {
+               return args;
+       }
+
+       @Override
+       public String toString() {
+               if (toString == null) {
+                       StringBuilder paramTypeStringBuilder = new 
StringBuilder(parameterTypes.length * 5);
+
+                       if (parameterTypes.length > 0) {
+                               
paramTypeStringBuilder.append(parameterTypes[0].getSimpleName());
+
+                               for (int i = 1; i < parameterTypes.length; i++) 
{
+                                       paramTypeStringBuilder
+                                               .append(", ")
+                                               
.append(parameterTypes[i].getSimpleName());
+                               }
+                       }
+
+                       toString = "LocalRpcInvocation(" + methodName + '(' + 
paramTypeStringBuilder + "))";
+               }
+
+               return toString;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
new file mode 100644
index 0000000..5cf9b98
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Remote {@link FencedMessage} implementation. This message is used when the 
communication
+ * is remote and thus requires its payload to be serializable.
+ *
+ * @param <F> type of the fencing token
+ * @param <P> type of the payload
+ */
+public class RemoteFencedMessage<F extends Serializable, P extends 
Serializable> implements FencedMessage<F, P>, Serializable {
+       private static final long serialVersionUID = 4043136067468477742L;
+
+       private final F fencingToken;
+       private final P payload;
+
+       public RemoteFencedMessage(F fencingToken, P payload) {
+               this.fencingToken = Preconditions.checkNotNull(fencingToken);
+               this.payload = Preconditions.checkNotNull(payload);
+       }
+
+       @Override
+       public F getFencingToken() {
+               return fencingToken;
+       }
+
+       @Override
+       public P getPayload() {
+               return payload;
+       }
+
+       @Override
+       public String toString() {
+               return "RemoteFencedMessage(" + fencingToken + ", " + payload + 
')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
new file mode 100644
index 0000000..779d5dd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Remote rpc invocation message which is used when the actor communication is 
remote and, thus, the
+ * message has to be serialized.
+ * <p>
+ * In order to fail fast and report an appropriate error message to the user, 
the method name, the
+ * parameter types and the arguments are eagerly serialized. In case the the 
invocation call
+ * contains a non-serializable object, then an {@link IOException} is thrown.
+ */
+public class RemoteRpcInvocation implements RpcInvocation, Serializable {
+       private static final long serialVersionUID = 6179354390913843809L;
+
+       // Serialized invocation data
+       private SerializedValue<RemoteRpcInvocation.MethodInvocation> 
serializedMethodInvocation;
+
+       // Transient field which is lazily initialized upon first access to the 
invocation data
+       private transient RemoteRpcInvocation.MethodInvocation methodInvocation;
+
+       private transient String toString;
+
+       public  RemoteRpcInvocation(
+               final String methodName,
+               final Class<?>[] parameterTypes,
+               final Object[] args) throws IOException {
+
+               serializedMethodInvocation = new SerializedValue<>(new 
RemoteRpcInvocation.MethodInvocation(methodName, parameterTypes, args));
+               methodInvocation = null;
+       }
+
+       @Override
+       public String getMethodName() throws IOException, 
ClassNotFoundException {
+               deserializeMethodInvocation();
+
+               return methodInvocation.getMethodName();
+       }
+
+       @Override
+       public Class<?>[] getParameterTypes() throws IOException, 
ClassNotFoundException {
+               deserializeMethodInvocation();
+
+               return methodInvocation.getParameterTypes();
+       }
+
+       @Override
+       public Object[] getArgs() throws IOException, ClassNotFoundException {
+               deserializeMethodInvocation();
+
+               return methodInvocation.getArgs();
+       }
+
+       @Override
+       public String toString() {
+               if (toString == null) {
+
+                       try {
+                               Class<?>[] parameterTypes = getParameterTypes();
+                               String methodName = getMethodName();
+
+                               StringBuilder paramTypeStringBuilder = new 
StringBuilder(parameterTypes.length * 5);
+
+                               if (parameterTypes.length > 0) {
+                                       
paramTypeStringBuilder.append(parameterTypes[0].getSimpleName());
+
+                                       for (int i = 1; i < 
parameterTypes.length; i++) {
+                                               paramTypeStringBuilder
+                                                       .append(", ")
+                                                       
.append(parameterTypes[i].getSimpleName());
+                                       }
+                               }
+
+                               toString = "RemoteRpcInvocation(" + methodName 
+ '(' + paramTypeStringBuilder + "))";
+                       } catch (IOException | ClassNotFoundException e) {
+                               toString = "Could not deserialize 
RemoteRpcInvocation: " + e.getMessage();
+                       }
+               }
+
+               return toString;
+       }
+
+       /**
+        * Size (#bytes of the serialized data) of the rpc invocation message.
+        *
+        * @return Size of the remote rpc invocation message
+        */
+       public long getSize() {
+               return serializedMethodInvocation.getByteArray().length;
+       }
+
+       private void deserializeMethodInvocation() throws IOException, 
ClassNotFoundException {
+               if (methodInvocation == null) {
+                       methodInvocation = 
serializedMethodInvocation.deserializeValue(ClassLoader.getSystemClassLoader());
+               }
+       }
+
+       // -------------------------------------------------------------------
+       // Serialization methods
+       // -------------------------------------------------------------------
+
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               oos.writeObject(serializedMethodInvocation);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+               serializedMethodInvocation = 
(SerializedValue<RemoteRpcInvocation.MethodInvocation>) ois.readObject();
+               methodInvocation = null;
+       }
+
+       // -------------------------------------------------------------------
+       // Utility classes
+       // -------------------------------------------------------------------
+
+       /**
+        * Wrapper class for the method invocation information
+        */
+       private static final class MethodInvocation implements Serializable {
+               private static final long serialVersionUID = 
9187962608946082519L;
+
+               private String methodName;
+               private Class<?>[] parameterTypes;
+               private Object[] args;
+
+               private MethodInvocation(final String methodName, final 
Class<?>[] parameterTypes, final Object[] args) {
+                       this.methodName = methodName;
+                       this.parameterTypes = 
Preconditions.checkNotNull(parameterTypes);
+                       this.args = args;
+               }
+
+               String getMethodName() {
+                       return methodName;
+               }
+
+               Class<?>[] getParameterTypes() {
+                       return parameterTypes;
+               }
+
+               Object[] getArgs() {
+                       return args;
+               }
+
+               private void writeObject(ObjectOutputStream oos) throws 
IOException {
+                       oos.writeUTF(methodName);
+
+                       oos.writeInt(parameterTypes.length);
+
+                       for (Class<?> parameterType : parameterTypes) {
+                               oos.writeObject(parameterType);
+                       }
+
+                       if (args != null) {
+                               oos.writeBoolean(true);
+
+                               for (int i = 0; i < args.length; i++) {
+                                       try {
+                                               oos.writeObject(args[i]);
+                                       } catch (IOException e) {
+                                               throw new IOException("Could 
not serialize " + i + "th argument of method " +
+                                                       methodName + ". This 
indicates that the argument type " +
+                                                       
args.getClass().getName() + " is not serializable. Arguments have to " +
+                                                       "be serializable for 
remote rpc calls.", e);
+                                       }
+                               }
+                       } else {
+                               oos.writeBoolean(false);
+                       }
+               }
+
+               private void readObject(ObjectInputStream ois) throws 
IOException, ClassNotFoundException {
+                       methodName = ois.readUTF();
+
+                       int length = ois.readInt();
+
+                       parameterTypes = new Class<?>[length];
+
+                       for (int i = 0; i < length; i++) {
+                               try {
+                                       parameterTypes[i] = (Class<?>) 
ois.readObject();
+                               } catch (IOException e) {
+                                       throw new IOException("Could not 
deserialize " + i + "th parameter type of method " +
+                                               methodName + '.', e);
+                               } catch (ClassNotFoundException e) {
+                                       throw new ClassNotFoundException("Could 
not deserialize " + i + "th " +
+                                               "parameter type of method " + 
methodName + ". This indicates that the parameter " +
+                                               "type is not part of the system 
class loader.", e);
+                               }
+                       }
+
+                       boolean hasArgs = ois.readBoolean();
+
+                       if (hasArgs) {
+                               args = new Object[length];
+
+                               for (int i = 0; i < length; i++) {
+                                       try {
+                                               args[i] = ois.readObject();
+                                       } catch (IOException e) {
+                                               throw new IOException("Could 
not deserialize " + i + "th argument of method " +
+                                                       methodName + '.', e);
+                                       } catch (ClassNotFoundException e) {
+                                               throw new 
ClassNotFoundException("Could not deserialize " + i + "th " +
+                                                       "argument of method " + 
methodName + ". This indicates that the argument " +
+                                                       "type is not part of 
the system class loader.", e);
+                                       }
+                               }
+                       } else {
+                               args = null;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RpcInvocation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RpcInvocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RpcInvocation.java
new file mode 100644
index 0000000..4e9f629
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RpcInvocation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import java.io.IOException;
+
+/**
+ * Interface for rpc invocation messages. The interface allows to request all 
necessary information
+ * to lookup a method and call it with the corresponding arguments.
+ */
+public interface RpcInvocation {
+
+       /**
+        * Returns the method's name.
+        *
+        * @return Method name
+        * @throws IOException if the rpc invocation message is a remote 
message and could not be deserialized
+        * @throws ClassNotFoundException if the rpc invocation message is a 
remote message and contains
+        * serialized classes which cannot be found on the receiving side
+        */
+       String getMethodName() throws IOException, ClassNotFoundException;
+
+       /**
+        * Returns the method's parameter types
+        *
+        * @return Method's parameter types
+        * @throws IOException if the rpc invocation message is a remote 
message and could not be deserialized
+        * @throws ClassNotFoundException if the rpc invocation message is a 
remote message and contains
+        * serialized classes which cannot be found on the receiving side
+        */
+       Class<?>[] getParameterTypes() throws IOException, 
ClassNotFoundException;
+
+       /**
+        * Returns the arguments of the remote procedure call
+        *
+        * @return Arguments of the remote procedure call
+        * @throws IOException if the rpc invocation message is a remote 
message and could not be deserialized
+        * @throws ClassNotFoundException if the rpc invocation message is a 
remote message and contains
+        * serialized classes which cannot be found on the receiving side
+        */
+       Object[] getArgs() throws IOException, ClassNotFoundException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RunAsync.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RunAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RunAsync.java
new file mode 100644
index 0000000..2f6d867
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RunAsync.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Message for asynchronous runnable invocations
+ */
+public final class RunAsync {
+
+       /** The runnable to be executed. Transient, so it gets lost upon 
serialization */
+       private final Runnable runnable;
+
+       /** The delay after which the runnable should be called */
+       private final long atTimeNanos;
+
+       /**
+        * Creates a new {@code RunAsync} message.
+        * 
+        * @param runnable    The Runnable to run.
+        * @param atTimeNanos The time (as for System.nanoTime()) when to 
execute the runnable.
+        */
+       public RunAsync(Runnable runnable, long atTimeNanos) {
+               checkArgument(atTimeNanos >= 0);
+               this.runnable = checkNotNull(runnable);
+               this.atTimeNanos = atTimeNanos;
+       }
+
+       public Runnable getRunnable() {
+               return runnable;
+       }
+
+       public long getTimeNanos() {
+               return atTimeNanos;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java
new file mode 100644
index 0000000..50b076c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+
+/**
+ * Shut down message used to trigger the shut down of an AkkaRpcActor. This
+ * message is only intended for internal use by the {@link AkkaRpcService}.
+ */
+public final class Shutdown implements ControlMessage {
+
+       private static Shutdown instance = new Shutdown();
+
+       public static Shutdown getInstance() {
+               return instance;
+       }
+
+       private Shutdown() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.java
new file mode 100644
index 0000000..27867c4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import org.apache.flink.runtime.rpc.FencedMainThreadExecutable;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Wrapper class indicating a message which is not required to match the 
fencing token
+ * as it is used by the {@link FencedMainThreadExecutable} to run code in the 
main thread without
+ * a valid fencing token. This is required for operations which are not scoped 
by the current
+ * fencing token (e.g. leadership grants).
+ *
+ * <p>IMPORTANT: This message is only intended to be send locally.
+ *
+ * @param <P> type of the payload
+ */
+public class UnfencedMessage<P> {
+       private final P payload;
+
+       public UnfencedMessage(P payload) {
+               this.payload = Preconditions.checkNotNull(payload);
+       }
+
+       public P getPayload() {
+               return payload;
+       }
+
+       @Override
+       public String toString() {
+               return "UnfencedMessage(" + payload + ')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 00762b9..f8eca16 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -23,16 +23,23 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
 
 import static org.junit.Assert.*;
 
@@ -44,6 +51,8 @@ public class AsyncCallsTest extends TestLogger {
 
        private static final ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
 
+       private static final Time timeout = Time.seconds(10L);
+
        private static final AkkaRpcService akkaRpcService =
                        new AkkaRpcService(actorSystem, 
Time.milliseconds(10000L));
 
@@ -162,6 +171,119 @@ public class AsyncCallsTest extends TestLogger {
                assertTrue("call was not properly delayed", ((stop - start) / 
1_000_000) >= delay);
        }
 
+       /**
+        * Tests that async code is not executed if the fencing token changes.
+        */
+       @Test
+       public void testRunAsyncWithFencing() throws Exception {
+               final Time shortTimeout = Time.milliseconds(100L);
+               final UUID newFencingToken = UUID.randomUUID();
+               final CompletableFuture<UUID> resultFuture = new 
CompletableFuture<>();
+
+               testRunAsync(
+                       endpoint -> {
+                               endpoint.runAsync(
+                                       () -> 
resultFuture.complete(endpoint.getFencingToken()));
+
+                               return resultFuture;
+                       },
+                       newFencingToken);
+
+               try {
+                       resultFuture.get(shortTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       fail("The async run operation should not complete since 
it is filtered out due to the changed fencing token.");
+               } catch (TimeoutException ignored) {}
+       }
+
+       /**
+        * Tests that code can be executed in the main thread without 
respecting the fencing token.
+        */
+       @Test
+       public void testRunAsyncWithoutFencing() throws Exception {
+               final CompletableFuture<UUID> resultFuture = new 
CompletableFuture<>();
+               final UUID newFencingToken = UUID.randomUUID();
+
+               testRunAsync(
+                       endpoint -> {
+                               endpoint.runAsyncWithoutFencing(
+                                       () -> 
resultFuture.complete(endpoint.getFencingToken()));
+                               return resultFuture;
+                       },
+                       newFencingToken);
+
+               assertEquals(newFencingToken, 
resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
+       }
+
+       /**
+        * Tests that async callables are not executed if the fencing token 
changes.
+        */
+       @Test
+       public void testCallAsyncWithFencing() throws Exception {
+               final UUID newFencingToken = UUID.randomUUID();
+
+               CompletableFuture<Boolean> resultFuture = testRunAsync(
+                       endpoint -> endpoint.callAsync(() -> true, timeout),
+                       newFencingToken);
+
+               try {
+                       resultFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       fail("The async call operation should fail due to the 
changed fencing token.");
+               } catch (ExecutionException e) {
+                       assertTrue(ExceptionUtils.stripExecutionException(e) 
instanceof FencingTokenMismatchException);
+               }
+       }
+
+       /**
+        * Tests that async callables can be executed in the main thread 
without checking the fencing token.
+        */
+       @Test
+       public void testCallAsyncWithoutFencing() throws Exception {
+               final UUID newFencingToken = UUID.randomUUID();
+
+               CompletableFuture<Boolean> resultFuture = testRunAsync(
+                       endpoint -> endpoint.callAsyncWithoutFencing(() -> 
true, timeout),
+                       newFencingToken);
+
+               assertTrue(resultFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS));
+       }
+
+       private static <T> CompletableFuture<T> 
testRunAsync(Function<FencedTestEndpoint, CompletableFuture<T>> runAsyncCall, 
UUID newFencingToken) throws Exception {
+               final UUID initialFencingToken = UUID.randomUUID();
+               final OneShotLatch enterSetNewFencingToken = new OneShotLatch();
+               final OneShotLatch triggerSetNewFencingToken = new 
OneShotLatch();
+               final FencedTestEndpoint fencedTestEndpoint = new 
FencedTestEndpoint(
+                       akkaRpcService,
+                       initialFencingToken,
+                       enterSetNewFencingToken,
+                       triggerSetNewFencingToken);
+               final FencedTestGateway fencedTestGateway = 
fencedTestEndpoint.getSelfGateway(FencedTestGateway.class);
+
+               try {
+                       fencedTestEndpoint.start();
+
+                       CompletableFuture<Acknowledge> newFencingTokenFuture = 
fencedTestGateway.setNewFencingToken(newFencingToken, timeout);
+
+                       assertFalse(newFencingTokenFuture.isDone());
+
+                       assertEquals(initialFencingToken, 
fencedTestEndpoint.getFencingToken());
+
+                       CompletableFuture<T> result = 
runAsyncCall.apply(fencedTestEndpoint);
+
+                       enterSetNewFencingToken.await();
+
+                       triggerSetNewFencingToken.trigger();
+
+                       newFencingTokenFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       return result;
+               } finally {
+                       fencedTestEndpoint.shutDown();
+                       
fencedTestEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  test RPC endpoint
        // 
------------------------------------------------------------------------
@@ -209,4 +331,39 @@ public class AsyncCallsTest extends TestLogger {
                        return concurrentAccess;
                }
        }
+
+       public interface FencedTestGateway extends FencedRpcGateway<UUID> {
+               CompletableFuture<Acknowledge> setNewFencingToken(UUID 
fencingToken, @RpcTimeout Time timeout);
+       }
+
+       public static class FencedTestEndpoint extends FencedRpcEndpoint<UUID> 
implements FencedTestGateway {
+
+               private final OneShotLatch enteringSetNewFencingToken;
+               private final OneShotLatch triggerSetNewFencingToken;
+
+               protected FencedTestEndpoint(
+                               RpcService rpcService,
+                               UUID initialFencingToken,
+                               OneShotLatch enteringSetNewFencingToken,
+                               OneShotLatch triggerSetNewFencingToken) {
+                       super(rpcService, initialFencingToken);
+
+                       this.enteringSetNewFencingToken = 
enteringSetNewFencingToken;
+                       this.triggerSetNewFencingToken = 
triggerSetNewFencingToken;
+               }
+
+               @Override
+               public CompletableFuture<Acknowledge> setNewFencingToken(UUID 
fencingToken, Time timeout) {
+                       enteringSetNewFencingToken.trigger();
+                       try {
+                               triggerSetNewFencingToken.await();
+                       } catch (InterruptedException e) {
+                               throw new 
RuntimeException("TriggerSetNewFencingToken OneShotLatch was interrupted.");
+                       }
+
+                       setFencingToken(fencingToken);
+
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
new file mode 100644
index 0000000..62d5354
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FencedRpcEndpointTest extends TestLogger {
+
+       private static final Time timeout = Time.seconds(10L);
+       private static RpcService rpcService;
+
+       @BeforeClass
+       public static void setup() {
+               rpcService = new TestingRpcService();
+       }
+
+       @AfterClass
+       public static void teardown() throws ExecutionException, 
InterruptedException, TimeoutException {
+               if (rpcService != null) {
+                       rpcService.stopService();
+                       
rpcService.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+               }
+       }
+
+       /**
+        * Tests that the fencing token can be retrieved from the 
FencedRpcEndpoint and self
+        * FencedRpcGateway. Moreover it tests that you can only set the 
fencing token from
+        * the main thread.
+        */
+       @Test
+       public void testFencingTokenSetting() throws Exception {
+               final UUID initialFencingToken = UUID.randomUUID();
+               final String value = "foobar";
+               FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, initialFencingToken, value);
+               FencedTestingGateway fencedTestingGateway = 
fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
+               FencedTestingGateway fencedGateway = 
fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
+
+               try {
+                       fencedTestingEndpoint.start();
+
+                       assertEquals(initialFencingToken, 
fencedGateway.getFencingToken());
+                       assertEquals(initialFencingToken, 
fencedTestingEndpoint.getFencingToken());
+
+                       final UUID newFencingToken = UUID.randomUUID();
+
+                       try {
+                               
fencedTestingEndpoint.setFencingToken(newFencingToken);
+                               fail("Fencing token can only be set from within 
the main thread.");
+                       } catch (AssertionError ignored) {
+                               // expected to fail
+                       }
+
+                       assertEquals(initialFencingToken, 
fencedTestingEndpoint.getFencingToken());
+
+                       CompletableFuture<Acknowledge> setFencingFuture = 
fencedTestingGateway.rpcSetFencingToken(newFencingToken, timeout);
+
+                       // wait for the completion of the set fencing token 
operation
+                       setFencingFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       // self gateway should adapt its fencing token
+                       assertEquals(newFencingToken, 
fencedGateway.getFencingToken());
+                       assertEquals(newFencingToken, 
fencedTestingEndpoint.getFencingToken());
+               } finally {
+                       fencedTestingEndpoint.shutDown();
+                       
fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+               }
+       }
+
+       /**
+        * Tests that messages with the wrong fencing token are filtered out.
+        */
+       @Test
+       public void testFencing() throws Exception {
+               final UUID initialFencingToken = UUID.randomUUID();
+               final UUID wrongFencingToken = UUID.randomUUID();
+               final String value = "barfoo";
+               FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, initialFencingToken, value);
+
+               try {
+                       fencedTestingEndpoint.start();
+
+                       final FencedTestingGateway properFencedGateway = 
rpcService.connect(fencedTestingEndpoint.getAddress(), initialFencingToken, 
FencedTestingGateway.class)
+                               .get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                       final FencedTestingGateway wronglyFencedGateway = 
rpcService.connect(fencedTestingEndpoint.getAddress(), wrongFencingToken, 
FencedTestingGateway.class)
+                               .get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       assertEquals(value, 
properFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS));
+
+                       try {
+                               
wronglyFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                               fail("This should fail since we have the wrong 
fencing token.");
+                       } catch (ExecutionException e) {
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenMismatchException);
+                       }
+
+                       final UUID newFencingToken = UUID.randomUUID();
+
+                       CompletableFuture<Acknowledge> newFencingTokenFuture = 
properFencedGateway.rpcSetFencingToken(newFencingToken, timeout);
+
+                       // wait for the new fencing token to be set
+                       newFencingTokenFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       // this should no longer work because of the new 
fencing token
+                       try {
+                               
properFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                               fail("This should fail since we have the wrong 
fencing token by now.");
+                       } catch (ExecutionException e) {
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenMismatchException);
+                       }
+
+               } finally {
+                       fencedTestingEndpoint.shutDown();
+                       
fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+               }
+       }
+
+       /**
+        * Tests that the self gateway always uses the current fencing token 
whereas the remote
+        * gateway has a fixed fencing token.
+        */
+       @Test
+       public void testRemoteAndSelfGateways() throws Exception {
+               final UUID initialFencingToken = UUID.randomUUID();
+               final UUID newFencingToken = UUID.randomUUID();
+               final String value = "foobar";
+
+               final FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, initialFencingToken, value);
+
+               try {
+                       fencedTestingEndpoint.start();
+
+                       FencedTestingGateway selfGateway = 
fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
+                       FencedTestingGateway remoteGateway = 
rpcService.connect(fencedTestingEndpoint.getAddress(), initialFencingToken, 
FencedTestingGateway.class)
+                               .get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       assertEquals(initialFencingToken, 
selfGateway.getFencingToken());
+                       assertEquals(initialFencingToken, 
remoteGateway.getFencingToken());
+
+                       assertEquals(value, 
selfGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS));
+                       assertEquals(value, 
remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS));
+
+                       CompletableFuture<Acknowledge> newFencingTokenFuture = 
selfGateway.rpcSetFencingToken(newFencingToken, timeout);
+
+                       // wait for the new fencing token to be set
+                       newFencingTokenFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       assertEquals(newFencingToken, 
selfGateway.getFencingToken());
+                       assertNotEquals(newFencingToken, 
remoteGateway.getFencingToken());
+
+                       assertEquals(value, 
selfGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS));
+
+                       try {
+                               
remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                               fail("This should have failed because we don't 
have the right fencing token.");
+                       } catch (ExecutionException e) {
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenMismatchException);
+                       }
+               } finally {
+                       fencedTestingEndpoint.shutDown();
+                       
fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+               }
+       }
+
+       /**
+        * Tests that call via the MainThreadExecutor fail after the fencing 
token changes.
+        */
+       @Test
+       public void testMainThreadExecutorUnderChangingFencingToken() throws 
Exception {
+               final Time shortTimeout = Time.milliseconds(100L);
+               final UUID initialFencingToken = UUID.randomUUID();
+               final String value = "foobar";
+               final FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, initialFencingToken, value);
+
+               try {
+                       fencedTestingEndpoint.start();
+
+                       FencedTestingGateway selfGateway = 
fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
+
+                       CompletableFuture<Acknowledge> 
mainThreadExecutorComputation = 
selfGateway.triggerMainThreadExecutorComputation(timeout);
+
+                       // we know that subsequent calls on the same gateway 
are executed sequentially
+                       // therefore, we know that the change fencing token 
call is executed after the trigger MainThreadExecutor
+                       // computation
+                       final UUID newFencingToken = UUID.randomUUID();
+                       CompletableFuture<Acknowledge> newFencingTokenFuture = 
selfGateway.rpcSetFencingToken(newFencingToken, timeout);
+
+                       newFencingTokenFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       // trigger the computation
+                       CompletableFuture<Acknowledge> triggerFuture = 
selfGateway.triggerComputationLatch(timeout);
+
+                       triggerFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       // wait for the main thread executor computation to fail
+                       try {
+                               
mainThreadExecutorComputation.get(shortTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                               fail("The MainThreadExecutor computation should 
be able to complete because it was filtered out leading to a timeout 
exception.");
+                       } catch (TimeoutException ignored) {
+                               // as predicted
+                       }
+
+               } finally {
+                       fencedTestingEndpoint.shutDown();
+                       
fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+               }
+       }
+
+       /**
+        * Tests that all calls from an unfenced remote gateway are ignored and 
that one cannot obtain
+        * the fencing token from such a gateway.
+        */
+       @Test
+       public void testUnfencedRemoteGateway() throws Exception {
+               final UUID initialFencingToken = UUID.randomUUID();
+               final String value = "foobar";
+
+               final FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, initialFencingToken, value);
+
+               try {
+                       fencedTestingEndpoint.start();
+
+                       FencedTestingGateway unfencedGateway = 
rpcService.connect(fencedTestingEndpoint.getAddress(), 
FencedTestingGateway.class)
+                               .get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       try {
+                               
unfencedGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                               fail("This should have failed because we have 
an unfenced gateway.");
+                       } catch (ExecutionException e) {
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof RpcException);
+                       }
+
+                       try {
+                               unfencedGateway.getFencingToken();
+                               fail("We should not be able to call 
getFencingToken on an unfenced gateway.");
+                       } catch (UnsupportedOperationException ignored) {
+                               // we should not be able to call 
getFencingToken on an unfenced gateway
+                       }
+               } finally {
+                       fencedTestingEndpoint.shutDown();
+                       
fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+               }
+       }
+
+       public interface FencedTestingGateway extends FencedRpcGateway<UUID> {
+               CompletableFuture<String> foobar(@RpcTimeout Time timeout);
+
+               CompletableFuture<Acknowledge> rpcSetFencingToken(UUID 
fencingToken, @RpcTimeout Time timeout);
+
+               CompletableFuture<Acknowledge> 
triggerMainThreadExecutorComputation(@RpcTimeout Time timeout);
+
+               CompletableFuture<Acknowledge> 
triggerComputationLatch(@RpcTimeout Time timeout);
+       }
+
+       private static class FencedTestingEndpoint extends 
FencedRpcEndpoint<UUID> implements FencedTestingGateway {
+
+               private final OneShotLatch computationLatch;
+
+               private final String value;
+
+               protected FencedTestingEndpoint(RpcService rpcService, UUID 
initialFencingToken, String value) {
+                       super(rpcService, initialFencingToken);
+
+                       computationLatch = new OneShotLatch();
+
+                       this.value = value;
+               }
+
+               @Override
+               public CompletableFuture<String> foobar(Time timeout) {
+                       return CompletableFuture.completedFuture(value);
+               }
+
+               @Override
+               public CompletableFuture<Acknowledge> rpcSetFencingToken(UUID 
fencingToken, Time timeout) {
+                       setFencingToken(fencingToken);
+
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
+
+               @Override
+               public CompletableFuture<Acknowledge> 
triggerMainThreadExecutorComputation(Time timeout) {
+                       return CompletableFuture.supplyAsync(
+                               () -> {
+                                       try {
+                                               computationLatch.await();
+                                       } catch (InterruptedException e) {
+                                               throw new 
FlinkFutureException("Waiting on latch failed.", e);
+                                       }
+
+                                       return value;
+                               },
+                               getRpcService().getExecutor())
+                       .thenApplyAsync(
+                               (String v) -> Acknowledge.get(),
+                               getMainThreadExecutor());
+               }
+
+               @Override
+               public CompletableFuture<Acknowledge> 
triggerComputationLatch(Time timeout) {
+                       computationLatch.trigger();
+
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 14cf35a..4b9f397 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 
+import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -104,7 +105,27 @@ public class TestingRpcService extends AkkaRpcService {
                                return FutureUtils.completedExceptionally(new 
Exception("Gateway registered under " + address + " is not of type " + clazz));
                        }
                } else {
-                       return FutureUtils.completedExceptionally(new 
Exception("No gateway registered under " + address + '.'));
+                       return super.connect(address, clazz);
+               }
+       }
+
+       @Override
+       public <F extends Serializable, C extends FencedRpcGateway<F>> 
CompletableFuture<C> connect(
+                       String address,
+                       F fencingToken,
+                       Class<C> clazz) {
+               RpcGateway gateway = registeredConnections.get(address);
+
+               if (gateway != null) {
+                       if (clazz.isAssignableFrom(gateway.getClass())) {
+                               @SuppressWarnings("unchecked")
+                               C typedGateway = (C) gateway;
+                               return 
CompletableFuture.completedFuture(typedGateway);
+                       } else {
+                               return FutureUtils.completedExceptionally(new 
Exception("Gateway registered under " + address + " is not of type " + clazz));
+                       }
+               } else {
+                       return super.connect(address, fencingToken, clazz);
                }
        }
 

Reply via email to