This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8773895b6b7cf75ee642ebd1496c0566a0ddaa40 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Mar 27 14:34:07 2020 +0100 [FLINK-16703][rpc] Set AkkaRpcActor state to TERMINATING when terminating This commit fixes a bug where we did not update the state of the AkkaRpcActor in case of terminating it. Moreover, this commit fixes the problem that the onStop action could have been called multiple times. Last but not least, it changes the enum names of the state implementations for better diagnostics. This closes #11549. --- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 5 ++ .../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 64 ++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 0397172..7ad3776 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -578,6 +578,11 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { TERMINATING; @Override + public State terminate(AkkaRpcActor<?> akkaRpcActor) { + return TERMINATING; + } + + @Override public boolean isRunning() { return true; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 0597b01..64270c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -36,6 +36,7 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; +import akka.actor.ActorRef; import akka.actor.ActorSystem; import org.hamcrest.core.Is; import org.junit.AfterClass; @@ -395,6 +396,38 @@ public class AkkaRpcActorTest extends TestLogger { } } + /** + * Tests that multiple termination calls won't trigger the onStop action multiple times. + * Note that this test is a probabilistic test which only fails sometimes without the fix. + * See FLINK-16703. + */ + @Test + public void callsOnStopOnlyOnce() throws Exception { + final CompletableFuture<Void> onStopFuture = new CompletableFuture<>(); + final OnStopCountingRpcEndpoint endpoint = new OnStopCountingRpcEndpoint(akkaRpcService, onStopFuture); + + try { + endpoint.start(); + + final AkkaBasedEndpoint selfGateway = endpoint.getSelfGateway(AkkaBasedEndpoint.class); + + // try to terminate the actor twice + selfGateway.getActorRef().tell(ControlMessages.TERMINATE, ActorRef.noSender()); + selfGateway.getActorRef().tell(ControlMessages.TERMINATE, ActorRef.noSender()); + + endpoint.waitUntilOnStopHasBeenCalled(); + + onStopFuture.complete(null); + + endpoint.getTerminationFuture().get(); + + assertThat(endpoint.getNumOnStopCalls(), is(1)); + } finally { + onStopFuture.complete(null); + RpcUtils.terminateRpcEndpoint(endpoint, timeout); + } + } + // ------------------------------------------------------------------------ // Test Actors and Interfaces // ------------------------------------------------------------------------ @@ -609,4 +642,35 @@ public class AkkaRpcActorTest extends TestLogger { countDownLatch.await(); } } + + // ------------------------------------------------------------------------ + + private static final class OnStopCountingRpcEndpoint extends RpcEndpoint { + + private final AtomicInteger numOnStopCalls = new AtomicInteger(0); + + private final OneShotLatch onStopHasBeenCalled = new OneShotLatch(); + + private final CompletableFuture<Void> onStopFuture; + + private OnStopCountingRpcEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture) { + super(rpcService); + this.onStopFuture = onStopFuture; + } + + @Override + protected CompletableFuture<Void> onStop() { + onStopHasBeenCalled.trigger(); + numOnStopCalls.incrementAndGet(); + return onStopFuture; + } + + private int getNumOnStopCalls() { + return numOnStopCalls.get(); + } + + private void waitUntilOnStopHasBeenCalled() throws InterruptedException { + onStopHasBeenCalled.await(); + } + } }
