[FLINK-4954] [rpc] Discard messages when AkkaRpcActor is in state Processing.STOP
When the AkkaRpcActor receives a message while being in state Processing.STOP it will discard it and send an AkkaRpcException back to the caller. This replaces the old stashing behaviour which had the problem that it was just a best effort approach to keep all received messages. Distributed components should not rely on this behaviour. That's why it was replaced with discarding messages. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/006a19d4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/006a19d4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/006a19d4 Branch: refs/heads/master Commit: 006a19d4e601a0917e073215d866b5b87dce375f Parents: 522edae Author: Till Rohrmann <[email protected]> Authored: Fri Oct 28 14:07:28 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:25 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 23 ++++++----- .../rpc/akka/exceptions/AkkaRpcException.java | 41 ++++++++++++++++++++ .../rpc/exceptions/RpcConnectionException.java | 4 +- .../runtime/rpc/exceptions/RpcException.java | 39 +++++++++++++++++++ .../runtime/rpc/akka/AkkaRpcActorTest.java | 21 ++++++++-- 5 files changed, 112 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/006a19d4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index c21383a..fe6b23b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorRef; import akka.actor.Status; -import akka.actor.UntypedActorWithStash; +import akka.actor.UntypedActor; import akka.dispatch.Futures; import akka.japi.Procedure; import akka.pattern.Patterns; @@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.akka.messages.CallAsync; import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.Processing; @@ -60,14 +61,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * in the context of the actor thread. * <p> * The {@link Processing} message controls the processing behaviour of the akka rpc actor. A - * {@link Processing#START} message unstashes all stashed messages and starts processing incoming - * messages. A {@link Processing#STOP} message stops processing messages and stashes incoming - * messages. + * {@link Processing#START} starts processing incoming messages. A {@link Processing#STOP} message + * stops processing messages. All messages which arrive when the processing is stopped, will be + * discarded. * * @param <C> Type of the {@link RpcGateway} associated with the {@link RpcEndpoint} * @param <T> Type of the {@link RpcEndpoint} */ -class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActorWithStash { +class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActor { private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class); @@ -86,7 +87,7 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp } @Override - public void postStop() { + public void postStop() throws Exception { super.postStop(); // IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise @@ -99,7 +100,6 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp @Override public void onReceive(final Object message) { if (message.equals(Processing.START)) { - unstashAll(); getContext().become(new Procedure<Object>() { @Override public void apply(Object msg) throws Exception { @@ -111,10 +111,15 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp } }); } else { - LOG.info("The rpc endpoint {} has not been started yet. Stashing message {} until processing is started.", + LOG.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", rpcEndpoint.getClass().getName(), message.getClass().getName()); - stash(); + + if (!getSender().equals(ActorRef.noSender())) { + // fail a possible future if we have a sender + getSender().tell(new Status.Failure(new AkkaRpcException("Discard message, because " + + "the rpc endpoint has not been started yet.")), getSelf()); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/006a19d4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaRpcException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaRpcException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaRpcException.java new file mode 100644 index 0000000..f0d6548 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaRpcException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.exceptions; + +import org.apache.flink.runtime.rpc.exceptions.RpcException; + +/** + * Base class for Akka RPC related exceptions. + */ +public class AkkaRpcException extends RpcException { + + private static final long serialVersionUID = -3796329968494146418L; + + public AkkaRpcException(String message) { + super(message); + } + + public AkkaRpcException(String message, Throwable cause) { + super(message, cause); + } + + public AkkaRpcException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/006a19d4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java index a22ebe7..4eaf34f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java @@ -18,13 +18,11 @@ package org.apache.flink.runtime.rpc.exceptions; -import java.util.concurrent.ExecutionException; - /** * Exception class which is thrown if a rpc connection failed. Usually this happens if the remote * host cannot be reached. */ -public class RpcConnectionException extends ExecutionException { +public class RpcConnectionException extends RpcException { private static final long serialVersionUID = -5500560405481142472L; public RpcConnectionException(String message) { http://git-wip-us.apache.org/repos/asf/flink/blob/006a19d4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcException.java new file mode 100644 index 0000000..652b3f5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.exceptions; + +/** + * Base class for RPC related exceptions. + */ +public class RpcException extends Exception { + + private static final long serialVersionUID = -7163591879289483630L; + + public RpcException(String message) { + super(message); + } + + public RpcException(String message, Throwable cause) { + super(message, cause); + } + + public RpcException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/006a19d4/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index d2dbab7..760e1a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.TestLogger; import org.hamcrest.core.Is; @@ -95,26 +96,38 @@ public class AkkaRpcActorTest extends TestLogger { } /** - * Tests that the {@link AkkaRpcActor} stashes messages until the corresponding + * Tests that the {@link AkkaRpcActor} discards messages until the corresponding * {@link RpcEndpoint} has been started. */ @Test - public void testMessageStashing() throws Exception { + public void testMessageDiscarding() throws Exception { int expectedValue = 1337; DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService); DummyRpcGateway rpcGateway = rpcEndpoint.getSelf(); - // this message should not be processed until we've started the rpc endpoint + // this message should be discarded and completed with an AkkaRpcException Future<Integer> result = rpcGateway.foobar(); + try { + result.get(timeout.getSize(), timeout.getUnit()); + fail("Expected an AkkaRpcException."); + } catch (ExecutionException ee) { + // expected this exception, because the endpoint has not been started + assertTrue(ee.getCause() instanceof AkkaRpcException); + } + // set a new value which we expect to be returned rpcEndpoint.setFoobar(expectedValue); - // now process the rpc + // start the endpoint so that it can process messages rpcEndpoint.start(); + // send the rpc again + result = rpcGateway.foobar(); + + // now we should receive a result :-) Integer actualValue = result.get(timeout.getSize(), timeout.getUnit()); assertThat("The new foobar value should have been returned.", actualValue, Is.is(expectedValue));
