This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch test
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07061bf0c527d70221a70b3f24da3d93556c485a
Author: zentol <[email protected]>
AuthorDate: Wed Mar 20 16:13:31 2019 +0100

    Bump Akka to 2.5.21
---
 .../apache/flink/runtime/rpc/akka/AkkaRpcActor.java | 21 +++++++++++++--------
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java  |  6 +++---
 pom.xml                                             |  2 +-
 3 files changed, 17 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index b434001..39cf6d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -38,9 +38,10 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
+import akka.actor.AbstractActor;
 import akka.actor.ActorRef;
 import akka.actor.Status;
-import akka.actor.UntypedActor;
+import akka.japi.pf.ReceiveBuilder;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,7 +80,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @param <T> Type of the {@link RpcEndpoint}
  */
-class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
+class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 
        protected final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -135,12 +136,16 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
        }
 
        @Override
-       public void onReceive(final Object message) {
-               if (message instanceof RemoteHandshakeMessage) {
-                       handleHandshakeMessage((RemoteHandshakeMessage) 
message);
-               } else if (message instanceof ControlMessages) {
-                       handleControlMessage(((ControlMessages) message));
-               } else if (state.isRunning()) {
+       public Receive createReceive() {
+               return ReceiveBuilder.create()
+                       .match(RemoteHandshakeMessage.class, 
this::handleHandshakeMessage)
+                       .match(ControlMessages.class, 
this::handleControlMessage)
+                       .matchAny(this::handleMessage)
+                       .build();
+       }
+
+       private void handleMessage(final Object message) {
+               if (state.isRunning()) {
                        mainThreadValidator.enterMainThread();
 
                        try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 4be670b..bc1093d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -324,7 +324,7 @@ public class AkkaRpcServiceTest extends TestLogger {
                        }
 
                        terminationFuture.get();
-                       
assertThat(akkaRpcService.getActorSystem().isTerminated(), is(true));
+                       
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted(), 
is(true));
                } finally {
                        RpcUtils.terminateRpcService(akkaRpcService, TIMEOUT);
                }
@@ -363,7 +363,7 @@ public class AkkaRpcServiceTest extends TestLogger {
                        assertThat(ExceptionUtils.findThrowable(e, 
OnStopException.class).isPresent(), is(true));
                }
 
-               assertThat(akkaRpcService.getActorSystem().isTerminated(), 
is(true));
+               
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted(), 
is(true));
        }
 
        private Collection<CompletableFuture<Void>> 
startStopNCountingAsynchronousOnStopEndpoints(AkkaRpcService akkaRpcService, 
int numberActors) throws Exception {
@@ -381,7 +381,7 @@ public class AkkaRpcServiceTest extends TestLogger {
                CompletableFuture<Void> terminationFuture = 
akkaRpcService.stopService();
 
                assertThat(terminationFuture.isDone(), is(false));
-               assertThat(akkaRpcService.getActorSystem().isTerminated(), 
is(false));
+               
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted(), 
is(false));
 
                countDownLatch.await();
 
diff --git a/pom.xml b/pom.xml
index f30ea67..f48c2ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,7 @@ under the License.
                <log4j.configuration>log4j-test.properties</log4j.configuration>
                <flink.shaded.version>6.0</flink.shaded.version>
                <guava.version>18.0</guava.version>
-               <akka.version>2.4.20</akka.version>
+               <akka.version>2.5.21</akka.version>
                <java.version>1.8</java.version>
                <slf4j.version>1.7.15</slf4j.version>
                <log4j.version>1.2.17</log4j.version>

Reply via email to