Repository: flink Updated Branches: refs/heads/master d7cea586e -> 1804aa33d
http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java new file mode 100644 index 0000000..2481065 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.messages; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * Local {@link FencedMessage} implementation. This message is used when the communication + * is local and thus does not require its payload to be serializable. + * + * @param <F> type of the fencing token + * @param <P> type of the payload + */ +public class LocalFencedMessage<F extends Serializable, P> implements FencedMessage<F, P> { + + private final F fencingToken; + private final P payload; + + public LocalFencedMessage(F fencingToken, P payload) { + this.fencingToken = Preconditions.checkNotNull(fencingToken); + this.payload = Preconditions.checkNotNull(payload); + } + + @Override + public F getFencingToken() { + return fencingToken; + } + + @Override + public P getPayload() { + return payload; + } + + @Override + public String toString() { + return "LocalFencedMessage(" + fencingToken + ", " + payload + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalRpcInvocation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalRpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalRpcInvocation.java new file mode 100644 index 0000000..0bd06c3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalRpcInvocation.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.messages; + +import org.apache.flink.util.Preconditions; + +/** + * Local rpc invocation message containing the remote procedure name, its parameter types and the + * corresponding call arguments. This message will only be sent if the communication is local and, + * thus, the message does not have to be serialized. + */ +public final class LocalRpcInvocation implements RpcInvocation { + + private final String methodName; + private final Class<?>[] parameterTypes; + private final Object[] args; + + private transient String toString; + + public LocalRpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] args) { + this.methodName = Preconditions.checkNotNull(methodName); + this.parameterTypes = Preconditions.checkNotNull(parameterTypes); + this.args = args; + + toString = null; + } + + @Override + public String getMethodName() { + return methodName; + } + + @Override + public Class<?>[] getParameterTypes() { + return parameterTypes; + } + + @Override + public Object[] getArgs() { + return args; + } + + @Override + public String toString() { + if (toString == null) { + StringBuilder paramTypeStringBuilder = new StringBuilder(parameterTypes.length * 5); + + if (parameterTypes.length > 0) { + paramTypeStringBuilder.append(parameterTypes[0].getSimpleName()); + + for (int i = 1; i < parameterTypes.length; i++) { + paramTypeStringBuilder + .append(", ") + .append(parameterTypes[i].getSimpleName()); + } + } + + toString = "LocalRpcInvocation(" + methodName + '(' + paramTypeStringBuilder + "))"; + } + + return toString; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java new file mode 100644 index 0000000..5cf9b98 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.messages; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * Remote {@link FencedMessage} implementation. This message is used when the communication + * is remote and thus requires its payload to be serializable. + * + * @param <F> type of the fencing token + * @param <P> type of the payload + */ +public class RemoteFencedMessage<F extends Serializable, P extends Serializable> implements FencedMessage<F, P>, Serializable { + private static final long serialVersionUID = 4043136067468477742L; + + private final F fencingToken; + private final P payload; + + public RemoteFencedMessage(F fencingToken, P payload) { + this.fencingToken = Preconditions.checkNotNull(fencingToken); + this.payload = Preconditions.checkNotNull(payload); + } + + @Override + public F getFencingToken() { + return fencingToken; + } + + @Override + public P getPayload() { + return payload; + } + + @Override + public String toString() { + return "RemoteFencedMessage(" + fencingToken + ", " + payload + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java new file mode 100644 index 0000000..779d5dd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.messages; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * Remote rpc invocation message which is used when the actor communication is remote and, thus, the + * message has to be serialized. + * <p> + * In order to fail fast and report an appropriate error message to the user, the method name, the + * parameter types and the arguments are eagerly serialized. In case the the invocation call + * contains a non-serializable object, then an {@link IOException} is thrown. + */ +public class RemoteRpcInvocation implements RpcInvocation, Serializable { + private static final long serialVersionUID = 6179354390913843809L; + + // Serialized invocation data + private SerializedValue<RemoteRpcInvocation.MethodInvocation> serializedMethodInvocation; + + // Transient field which is lazily initialized upon first access to the invocation data + private transient RemoteRpcInvocation.MethodInvocation methodInvocation; + + private transient String toString; + + public RemoteRpcInvocation( + final String methodName, + final Class<?>[] parameterTypes, + final Object[] args) throws IOException { + + serializedMethodInvocation = new SerializedValue<>(new RemoteRpcInvocation.MethodInvocation(methodName, parameterTypes, args)); + methodInvocation = null; + } + + @Override + public String getMethodName() throws IOException, ClassNotFoundException { + deserializeMethodInvocation(); + + return methodInvocation.getMethodName(); + } + + @Override + public Class<?>[] getParameterTypes() throws IOException, ClassNotFoundException { + deserializeMethodInvocation(); + + return methodInvocation.getParameterTypes(); + } + + @Override + public Object[] getArgs() throws IOException, ClassNotFoundException { + deserializeMethodInvocation(); + + return methodInvocation.getArgs(); + } + + @Override + public String toString() { + if (toString == null) { + + try { + Class<?>[] parameterTypes = getParameterTypes(); + String methodName = getMethodName(); + + StringBuilder paramTypeStringBuilder = new StringBuilder(parameterTypes.length * 5); + + if (parameterTypes.length > 0) { + paramTypeStringBuilder.append(parameterTypes[0].getSimpleName()); + + for (int i = 1; i < parameterTypes.length; i++) { + paramTypeStringBuilder + .append(", ") + .append(parameterTypes[i].getSimpleName()); + } + } + + toString = "RemoteRpcInvocation(" + methodName + '(' + paramTypeStringBuilder + "))"; + } catch (IOException | ClassNotFoundException e) { + toString = "Could not deserialize RemoteRpcInvocation: " + e.getMessage(); + } + } + + return toString; + } + + /** + * Size (#bytes of the serialized data) of the rpc invocation message. + * + * @return Size of the remote rpc invocation message + */ + public long getSize() { + return serializedMethodInvocation.getByteArray().length; + } + + private void deserializeMethodInvocation() throws IOException, ClassNotFoundException { + if (methodInvocation == null) { + methodInvocation = serializedMethodInvocation.deserializeValue(ClassLoader.getSystemClassLoader()); + } + } + + // ------------------------------------------------------------------- + // Serialization methods + // ------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.writeObject(serializedMethodInvocation); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + serializedMethodInvocation = (SerializedValue<RemoteRpcInvocation.MethodInvocation>) ois.readObject(); + methodInvocation = null; + } + + // ------------------------------------------------------------------- + // Utility classes + // ------------------------------------------------------------------- + + /** + * Wrapper class for the method invocation information + */ + private static final class MethodInvocation implements Serializable { + private static final long serialVersionUID = 9187962608946082519L; + + private String methodName; + private Class<?>[] parameterTypes; + private Object[] args; + + private MethodInvocation(final String methodName, final Class<?>[] parameterTypes, final Object[] args) { + this.methodName = methodName; + this.parameterTypes = Preconditions.checkNotNull(parameterTypes); + this.args = args; + } + + String getMethodName() { + return methodName; + } + + Class<?>[] getParameterTypes() { + return parameterTypes; + } + + Object[] getArgs() { + return args; + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.writeUTF(methodName); + + oos.writeInt(parameterTypes.length); + + for (Class<?> parameterType : parameterTypes) { + oos.writeObject(parameterType); + } + + if (args != null) { + oos.writeBoolean(true); + + for (int i = 0; i < args.length; i++) { + try { + oos.writeObject(args[i]); + } catch (IOException e) { + throw new IOException("Could not serialize " + i + "th argument of method " + + methodName + ". This indicates that the argument type " + + args.getClass().getName() + " is not serializable. Arguments have to " + + "be serializable for remote rpc calls.", e); + } + } + } else { + oos.writeBoolean(false); + } + } + + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + methodName = ois.readUTF(); + + int length = ois.readInt(); + + parameterTypes = new Class<?>[length]; + + for (int i = 0; i < length; i++) { + try { + parameterTypes[i] = (Class<?>) ois.readObject(); + } catch (IOException e) { + throw new IOException("Could not deserialize " + i + "th parameter type of method " + + methodName + '.', e); + } catch (ClassNotFoundException e) { + throw new ClassNotFoundException("Could not deserialize " + i + "th " + + "parameter type of method " + methodName + ". This indicates that the parameter " + + "type is not part of the system class loader.", e); + } + } + + boolean hasArgs = ois.readBoolean(); + + if (hasArgs) { + args = new Object[length]; + + for (int i = 0; i < length; i++) { + try { + args[i] = ois.readObject(); + } catch (IOException e) { + throw new IOException("Could not deserialize " + i + "th argument of method " + + methodName + '.', e); + } catch (ClassNotFoundException e) { + throw new ClassNotFoundException("Could not deserialize " + i + "th " + + "argument of method " + methodName + ". This indicates that the argument " + + "type is not part of the system class loader.", e); + } + } + } else { + args = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RpcInvocation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RpcInvocation.java new file mode 100644 index 0000000..4e9f629 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RpcInvocation.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.messages; + +import java.io.IOException; + +/** + * Interface for rpc invocation messages. The interface allows to request all necessary information + * to lookup a method and call it with the corresponding arguments. + */ +public interface RpcInvocation { + + /** + * Returns the method's name. + * + * @return Method name + * @throws IOException if the rpc invocation message is a remote message and could not be deserialized + * @throws ClassNotFoundException if the rpc invocation message is a remote message and contains + * serialized classes which cannot be found on the receiving side + */ + String getMethodName() throws IOException, ClassNotFoundException; + + /** + * Returns the method's parameter types + * + * @return Method's parameter types + * @throws IOException if the rpc invocation message is a remote message and could not be deserialized + * @throws ClassNotFoundException if the rpc invocation message is a remote message and contains + * serialized classes which cannot be found on the receiving side + */ + Class<?>[] getParameterTypes() throws IOException, ClassNotFoundException; + + /** + * Returns the arguments of the remote procedure call + * + * @return Arguments of the remote procedure call + * @throws IOException if the rpc invocation message is a remote message and could not be deserialized + * @throws ClassNotFoundException if the rpc invocation message is a remote message and contains + * serialized classes which cannot be found on the receiving side + */ + Object[] getArgs() throws IOException, ClassNotFoundException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RunAsync.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RunAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RunAsync.java new file mode 100644 index 0000000..2f6d867 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RunAsync.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.messages; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Message for asynchronous runnable invocations + */ +public final class RunAsync { + + /** The runnable to be executed. Transient, so it gets lost upon serialization */ + private final Runnable runnable; + + /** The delay after which the runnable should be called */ + private final long atTimeNanos; + + /** + * Creates a new {@code RunAsync} message. + * + * @param runnable The Runnable to run. + * @param atTimeNanos The time (as for System.nanoTime()) when to execute the runnable. + */ + public RunAsync(Runnable runnable, long atTimeNanos) { + checkArgument(atTimeNanos >= 0); + this.runnable = checkNotNull(runnable); + this.atTimeNanos = atTimeNanos; + } + + public Runnable getRunnable() { + return runnable; + } + + public long getTimeNanos() { + return atTimeNanos; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java new file mode 100644 index 0000000..50b076c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.messages; + +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; + +/** + * Shut down message used to trigger the shut down of an AkkaRpcActor. This + * message is only intended for internal use by the {@link AkkaRpcService}. + */ +public final class Shutdown implements ControlMessage { + + private static Shutdown instance = new Shutdown(); + + public static Shutdown getInstance() { + return instance; + } + + private Shutdown() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.java new file mode 100644 index 0000000..27867c4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.messages; + +import org.apache.flink.runtime.rpc.FencedMainThreadExecutable; +import org.apache.flink.util.Preconditions; + +/** + * Wrapper class indicating a message which is not required to match the fencing token + * as it is used by the {@link FencedMainThreadExecutable} to run code in the main thread without + * a valid fencing token. This is required for operations which are not scoped by the current + * fencing token (e.g. leadership grants). + * + * <p>IMPORTANT: This message is only intended to be send locally. + * + * @param <P> type of the payload + */ +public class UnfencedMessage<P> { + private final P payload; + + public UnfencedMessage(P payload) { + this.payload = Preconditions.checkNotNull(payload); + } + + public P getPayload() { + return payload; + } + + @Override + public String toString() { + return "UnfencedMessage(" + payload + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 00762b9..f8eca16 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -23,16 +23,23 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import static org.junit.Assert.*; @@ -44,6 +51,8 @@ public class AsyncCallsTest extends TestLogger { private static final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + private static final Time timeout = Time.seconds(10L); + private static final AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, Time.milliseconds(10000L)); @@ -162,6 +171,119 @@ public class AsyncCallsTest extends TestLogger { assertTrue("call was not properly delayed", ((stop - start) / 1_000_000) >= delay); } + /** + * Tests that async code is not executed if the fencing token changes. + */ + @Test + public void testRunAsyncWithFencing() throws Exception { + final Time shortTimeout = Time.milliseconds(100L); + final UUID newFencingToken = UUID.randomUUID(); + final CompletableFuture<UUID> resultFuture = new CompletableFuture<>(); + + testRunAsync( + endpoint -> { + endpoint.runAsync( + () -> resultFuture.complete(endpoint.getFencingToken())); + + return resultFuture; + }, + newFencingToken); + + try { + resultFuture.get(shortTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + fail("The async run operation should not complete since it is filtered out due to the changed fencing token."); + } catch (TimeoutException ignored) {} + } + + /** + * Tests that code can be executed in the main thread without respecting the fencing token. + */ + @Test + public void testRunAsyncWithoutFencing() throws Exception { + final CompletableFuture<UUID> resultFuture = new CompletableFuture<>(); + final UUID newFencingToken = UUID.randomUUID(); + + testRunAsync( + endpoint -> { + endpoint.runAsyncWithoutFencing( + () -> resultFuture.complete(endpoint.getFencingToken())); + return resultFuture; + }, + newFencingToken); + + assertEquals(newFencingToken, resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); + } + + /** + * Tests that async callables are not executed if the fencing token changes. + */ + @Test + public void testCallAsyncWithFencing() throws Exception { + final UUID newFencingToken = UUID.randomUUID(); + + CompletableFuture<Boolean> resultFuture = testRunAsync( + endpoint -> endpoint.callAsync(() -> true, timeout), + newFencingToken); + + try { + resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + fail("The async call operation should fail due to the changed fencing token."); + } catch (ExecutionException e) { + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + } + } + + /** + * Tests that async callables can be executed in the main thread without checking the fencing token. + */ + @Test + public void testCallAsyncWithoutFencing() throws Exception { + final UUID newFencingToken = UUID.randomUUID(); + + CompletableFuture<Boolean> resultFuture = testRunAsync( + endpoint -> endpoint.callAsyncWithoutFencing(() -> true, timeout), + newFencingToken); + + assertTrue(resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); + } + + private static <T> CompletableFuture<T> testRunAsync(Function<FencedTestEndpoint, CompletableFuture<T>> runAsyncCall, UUID newFencingToken) throws Exception { + final UUID initialFencingToken = UUID.randomUUID(); + final OneShotLatch enterSetNewFencingToken = new OneShotLatch(); + final OneShotLatch triggerSetNewFencingToken = new OneShotLatch(); + final FencedTestEndpoint fencedTestEndpoint = new FencedTestEndpoint( + akkaRpcService, + initialFencingToken, + enterSetNewFencingToken, + triggerSetNewFencingToken); + final FencedTestGateway fencedTestGateway = fencedTestEndpoint.getSelfGateway(FencedTestGateway.class); + + try { + fencedTestEndpoint.start(); + + CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestGateway.setNewFencingToken(newFencingToken, timeout); + + assertFalse(newFencingTokenFuture.isDone()); + + assertEquals(initialFencingToken, fencedTestEndpoint.getFencingToken()); + + CompletableFuture<T> result = runAsyncCall.apply(fencedTestEndpoint); + + enterSetNewFencingToken.await(); + + triggerSetNewFencingToken.trigger(); + + newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + return result; + } finally { + fencedTestEndpoint.shutDown(); + fencedTestEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + // ------------------------------------------------------------------------ // test RPC endpoint // ------------------------------------------------------------------------ @@ -209,4 +331,39 @@ public class AsyncCallsTest extends TestLogger { return concurrentAccess; } } + + public interface FencedTestGateway extends FencedRpcGateway<UUID> { + CompletableFuture<Acknowledge> setNewFencingToken(UUID fencingToken, @RpcTimeout Time timeout); + } + + public static class FencedTestEndpoint extends FencedRpcEndpoint<UUID> implements FencedTestGateway { + + private final OneShotLatch enteringSetNewFencingToken; + private final OneShotLatch triggerSetNewFencingToken; + + protected FencedTestEndpoint( + RpcService rpcService, + UUID initialFencingToken, + OneShotLatch enteringSetNewFencingToken, + OneShotLatch triggerSetNewFencingToken) { + super(rpcService, initialFencingToken); + + this.enteringSetNewFencingToken = enteringSetNewFencingToken; + this.triggerSetNewFencingToken = triggerSetNewFencingToken; + } + + @Override + public CompletableFuture<Acknowledge> setNewFencingToken(UUID fencingToken, Time timeout) { + enteringSetNewFencingToken.trigger(); + try { + triggerSetNewFencingToken.await(); + } catch (InterruptedException e) { + throw new RuntimeException("TriggerSetNewFencingToken OneShotLatch was interrupted."); + } + + setFencingToken(fencingToken); + + return CompletableFuture.completedFuture(Acknowledge.get()); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java new file mode 100644 index 0000000..62d5354 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.RpcException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class FencedRpcEndpointTest extends TestLogger { + + private static final Time timeout = Time.seconds(10L); + private static RpcService rpcService; + + @BeforeClass + public static void setup() { + rpcService = new TestingRpcService(); + } + + @AfterClass + public static void teardown() throws ExecutionException, InterruptedException, TimeoutException { + if (rpcService != null) { + rpcService.stopService(); + rpcService.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + + /** + * Tests that the fencing token can be retrieved from the FencedRpcEndpoint and self + * FencedRpcGateway. Moreover it tests that you can only set the fencing token from + * the main thread. + */ + @Test + public void testFencingTokenSetting() throws Exception { + final UUID initialFencingToken = UUID.randomUUID(); + final String value = "foobar"; + FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + FencedTestingGateway fencedTestingGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class); + FencedTestingGateway fencedGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class); + + try { + fencedTestingEndpoint.start(); + + assertEquals(initialFencingToken, fencedGateway.getFencingToken()); + assertEquals(initialFencingToken, fencedTestingEndpoint.getFencingToken()); + + final UUID newFencingToken = UUID.randomUUID(); + + try { + fencedTestingEndpoint.setFencingToken(newFencingToken); + fail("Fencing token can only be set from within the main thread."); + } catch (AssertionError ignored) { + // expected to fail + } + + assertEquals(initialFencingToken, fencedTestingEndpoint.getFencingToken()); + + CompletableFuture<Acknowledge> setFencingFuture = fencedTestingGateway.rpcSetFencingToken(newFencingToken, timeout); + + // wait for the completion of the set fencing token operation + setFencingFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + // self gateway should adapt its fencing token + assertEquals(newFencingToken, fencedGateway.getFencingToken()); + assertEquals(newFencingToken, fencedTestingEndpoint.getFencingToken()); + } finally { + fencedTestingEndpoint.shutDown(); + fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + + /** + * Tests that messages with the wrong fencing token are filtered out. + */ + @Test + public void testFencing() throws Exception { + final UUID initialFencingToken = UUID.randomUUID(); + final UUID wrongFencingToken = UUID.randomUUID(); + final String value = "barfoo"; + FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + + try { + fencedTestingEndpoint.start(); + + final FencedTestingGateway properFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), initialFencingToken, FencedTestingGateway.class) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + final FencedTestingGateway wronglyFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), wrongFencingToken, FencedTestingGateway.class) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertEquals(value, properFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); + + try { + wronglyFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + fail("This should fail since we have the wrong fencing token."); + } catch (ExecutionException e) { + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + } + + final UUID newFencingToken = UUID.randomUUID(); + + CompletableFuture<Acknowledge> newFencingTokenFuture = properFencedGateway.rpcSetFencingToken(newFencingToken, timeout); + + // wait for the new fencing token to be set + newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + // this should no longer work because of the new fencing token + try { + properFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + fail("This should fail since we have the wrong fencing token by now."); + } catch (ExecutionException e) { + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + } + + } finally { + fencedTestingEndpoint.shutDown(); + fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + + /** + * Tests that the self gateway always uses the current fencing token whereas the remote + * gateway has a fixed fencing token. + */ + @Test + public void testRemoteAndSelfGateways() throws Exception { + final UUID initialFencingToken = UUID.randomUUID(); + final UUID newFencingToken = UUID.randomUUID(); + final String value = "foobar"; + + final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + + try { + fencedTestingEndpoint.start(); + + FencedTestingGateway selfGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class); + FencedTestingGateway remoteGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), initialFencingToken, FencedTestingGateway.class) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertEquals(initialFencingToken, selfGateway.getFencingToken()); + assertEquals(initialFencingToken, remoteGateway.getFencingToken()); + + assertEquals(value, selfGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); + assertEquals(value, remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); + + CompletableFuture<Acknowledge> newFencingTokenFuture = selfGateway.rpcSetFencingToken(newFencingToken, timeout); + + // wait for the new fencing token to be set + newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertEquals(newFencingToken, selfGateway.getFencingToken()); + assertNotEquals(newFencingToken, remoteGateway.getFencingToken()); + + assertEquals(value, selfGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); + + try { + remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + fail("This should have failed because we don't have the right fencing token."); + } catch (ExecutionException e) { + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + } + } finally { + fencedTestingEndpoint.shutDown(); + fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + + /** + * Tests that call via the MainThreadExecutor fail after the fencing token changes. + */ + @Test + public void testMainThreadExecutorUnderChangingFencingToken() throws Exception { + final Time shortTimeout = Time.milliseconds(100L); + final UUID initialFencingToken = UUID.randomUUID(); + final String value = "foobar"; + final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + + try { + fencedTestingEndpoint.start(); + + FencedTestingGateway selfGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class); + + CompletableFuture<Acknowledge> mainThreadExecutorComputation = selfGateway.triggerMainThreadExecutorComputation(timeout); + + // we know that subsequent calls on the same gateway are executed sequentially + // therefore, we know that the change fencing token call is executed after the trigger MainThreadExecutor + // computation + final UUID newFencingToken = UUID.randomUUID(); + CompletableFuture<Acknowledge> newFencingTokenFuture = selfGateway.rpcSetFencingToken(newFencingToken, timeout); + + newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + // trigger the computation + CompletableFuture<Acknowledge> triggerFuture = selfGateway.triggerComputationLatch(timeout); + + triggerFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + // wait for the main thread executor computation to fail + try { + mainThreadExecutorComputation.get(shortTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + fail("The MainThreadExecutor computation should be able to complete because it was filtered out leading to a timeout exception."); + } catch (TimeoutException ignored) { + // as predicted + } + + } finally { + fencedTestingEndpoint.shutDown(); + fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + + /** + * Tests that all calls from an unfenced remote gateway are ignored and that one cannot obtain + * the fencing token from such a gateway. + */ + @Test + public void testUnfencedRemoteGateway() throws Exception { + final UUID initialFencingToken = UUID.randomUUID(); + final String value = "foobar"; + + final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + + try { + fencedTestingEndpoint.start(); + + FencedTestingGateway unfencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), FencedTestingGateway.class) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + try { + unfencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + fail("This should have failed because we have an unfenced gateway."); + } catch (ExecutionException e) { + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof RpcException); + } + + try { + unfencedGateway.getFencingToken(); + fail("We should not be able to call getFencingToken on an unfenced gateway."); + } catch (UnsupportedOperationException ignored) { + // we should not be able to call getFencingToken on an unfenced gateway + } + } finally { + fencedTestingEndpoint.shutDown(); + fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + + public interface FencedTestingGateway extends FencedRpcGateway<UUID> { + CompletableFuture<String> foobar(@RpcTimeout Time timeout); + + CompletableFuture<Acknowledge> rpcSetFencingToken(UUID fencingToken, @RpcTimeout Time timeout); + + CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(@RpcTimeout Time timeout); + + CompletableFuture<Acknowledge> triggerComputationLatch(@RpcTimeout Time timeout); + } + + private static class FencedTestingEndpoint extends FencedRpcEndpoint<UUID> implements FencedTestingGateway { + + private final OneShotLatch computationLatch; + + private final String value; + + protected FencedTestingEndpoint(RpcService rpcService, UUID initialFencingToken, String value) { + super(rpcService, initialFencingToken); + + computationLatch = new OneShotLatch(); + + this.value = value; + } + + @Override + public CompletableFuture<String> foobar(Time timeout) { + return CompletableFuture.completedFuture(value); + } + + @Override + public CompletableFuture<Acknowledge> rpcSetFencingToken(UUID fencingToken, Time timeout) { + setFencingToken(fencingToken); + + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(Time timeout) { + return CompletableFuture.supplyAsync( + () -> { + try { + computationLatch.await(); + } catch (InterruptedException e) { + throw new FlinkFutureException("Waiting on latch failed.", e); + } + + return value; + }, + getRpcService().getExecutor()) + .thenApplyAsync( + (String v) -> Acknowledge.get(), + getMainThreadExecutor()); + } + + @Override + public CompletableFuture<Acknowledge> triggerComputationLatch(Time timeout) { + computationLatch.trigger(); + + return CompletableFuture.completedFuture(Acknowledge.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index 14cf35a..4b9f397 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import java.io.Serializable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -104,7 +105,27 @@ public class TestingRpcService extends AkkaRpcService { return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz)); } } else { - return FutureUtils.completedExceptionally(new Exception("No gateway registered under " + address + '.')); + return super.connect(address, clazz); + } + } + + @Override + public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect( + String address, + F fencingToken, + Class<C> clazz) { + RpcGateway gateway = registeredConnections.get(address); + + if (gateway != null) { + if (clazz.isAssignableFrom(gateway.getClass())) { + @SuppressWarnings("unchecked") + C typedGateway = (C) gateway; + return CompletableFuture.completedFuture(typedGateway); + } else { + return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz)); + } + } else { + return super.connect(address, fencingToken, clazz); } }