This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b71c4002d3942f48807d2b5c0ac64ee3e55b2f59
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Mar 27 14:34:07 2020 +0100

    [FLINK-16703][rpc] Set AkkaRpcActor state to TERMINATING when terminating
    
    This commit fixes a bug where we did not update the state of the 
AkkaRpcActor
    in case of terminating it. Moreover, this commit fixes the problem that the
    onStop action could have been called multiple times. Last but not least, it
    changes the enum names of the state implementations for better diagnostics.
    
    This closes #11549.
---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  5 ++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 64 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 0397172..7ad3776 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -578,6 +578,11 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
                TERMINATING;
 
                @Override
+               public State terminate(AkkaRpcActor<?> akkaRpcActor) {
+                       return TERMINATING;
+               }
+
+               @Override
                public boolean isRunning() {
                        return true;
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index c010810..c912c79 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
@@ -397,6 +398,38 @@ public class AkkaRpcActorTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that multiple termination calls won't trigger the onStop 
action multiple times.
+        * Note that this test is a probabilistic test which only fails 
sometimes without the fix.
+        * See FLINK-16703.
+        */
+       @Test
+       public void callsOnStopOnlyOnce() throws Exception {
+               final CompletableFuture<Void> onStopFuture = new 
CompletableFuture<>();
+               final OnStopCountingRpcEndpoint endpoint = new 
OnStopCountingRpcEndpoint(akkaRpcService, onStopFuture);
+
+               try {
+                       endpoint.start();
+
+                       final AkkaBasedEndpoint selfGateway = 
endpoint.getSelfGateway(AkkaBasedEndpoint.class);
+
+                       // try to terminate the actor twice
+                       
selfGateway.getActorRef().tell(ControlMessages.TERMINATE, ActorRef.noSender());
+                       
selfGateway.getActorRef().tell(ControlMessages.TERMINATE, ActorRef.noSender());
+
+                       endpoint.waitUntilOnStopHasBeenCalled();
+
+                       onStopFuture.complete(null);
+
+                       endpoint.getTerminationFuture().get();
+
+                       assertThat(endpoint.getNumOnStopCalls(), is(1));
+               } finally {
+                       onStopFuture.complete(null);
+                       RpcUtils.terminateRpcEndpoint(endpoint, timeout);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Test Actors and Interfaces
        // 
------------------------------------------------------------------------
@@ -611,4 +644,35 @@ public class AkkaRpcActorTest extends TestLogger {
                        countDownLatch.await();
                }
        }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class OnStopCountingRpcEndpoint extends 
RpcEndpoint {
+
+               private final AtomicInteger numOnStopCalls = new 
AtomicInteger(0);
+
+               private final OneShotLatch onStopHasBeenCalled = new 
OneShotLatch();
+
+               private final CompletableFuture<Void> onStopFuture;
+
+               private OnStopCountingRpcEndpoint(RpcService rpcService, 
CompletableFuture<Void> onStopFuture) {
+                       super(rpcService);
+                       this.onStopFuture = onStopFuture;
+               }
+
+               @Override
+               protected CompletableFuture<Void> onStop() {
+                       onStopHasBeenCalled.trigger();
+                       numOnStopCalls.incrementAndGet();
+                       return onStopFuture;
+               }
+
+               private int getNumOnStopCalls() {
+                       return numOnStopCalls.get();
+               }
+
+               private void waitUntilOnStopHasBeenCalled() throws 
InterruptedException {
+                       onStopHasBeenCalled.await();
+               }
+       }
 }

Reply via email to