This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch test in repository https://gitbox.apache.org/repos/asf/flink.git
commit 07061bf0c527d70221a70b3f24da3d93556c485a Author: zentol <[email protected]> AuthorDate: Wed Mar 20 16:13:31 2019 +0100 Bump Akka to 2.5.21 --- .../apache/flink/runtime/rpc/akka/AkkaRpcActor.java | 21 +++++++++++++-------- .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 6 +++--- pom.xml | 2 +- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index b434001..39cf6d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -38,9 +38,10 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.Status; -import akka.actor.UntypedActor; +import akka.japi.pf.ReceiveBuilder; import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +80,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * @param <T> Type of the {@link RpcEndpoint} */ -class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { +class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { protected final Logger log = LoggerFactory.getLogger(getClass()); @@ -135,12 +136,16 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { } @Override - public void onReceive(final Object message) { - if (message instanceof RemoteHandshakeMessage) { - handleHandshakeMessage((RemoteHandshakeMessage) message); - } else if (message instanceof ControlMessages) { - handleControlMessage(((ControlMessages) message)); - } else if (state.isRunning()) { + public Receive createReceive() { + return ReceiveBuilder.create() + .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage) + .match(ControlMessages.class, this::handleControlMessage) + .matchAny(this::handleMessage) + .build(); + } + + private void handleMessage(final Object message) { + if (state.isRunning()) { mainThreadValidator.enterMainThread(); try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 4be670b..bc1093d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -324,7 +324,7 @@ public class AkkaRpcServiceTest extends TestLogger { } terminationFuture.get(); - assertThat(akkaRpcService.getActorSystem().isTerminated(), is(true)); + assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted(), is(true)); } finally { RpcUtils.terminateRpcService(akkaRpcService, TIMEOUT); } @@ -363,7 +363,7 @@ public class AkkaRpcServiceTest extends TestLogger { assertThat(ExceptionUtils.findThrowable(e, OnStopException.class).isPresent(), is(true)); } - assertThat(akkaRpcService.getActorSystem().isTerminated(), is(true)); + assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted(), is(true)); } private Collection<CompletableFuture<Void>> startStopNCountingAsynchronousOnStopEndpoints(AkkaRpcService akkaRpcService, int numberActors) throws Exception { @@ -381,7 +381,7 @@ public class AkkaRpcServiceTest extends TestLogger { CompletableFuture<Void> terminationFuture = akkaRpcService.stopService(); assertThat(terminationFuture.isDone(), is(false)); - assertThat(akkaRpcService.getActorSystem().isTerminated(), is(false)); + assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted(), is(false)); countDownLatch.await(); diff --git a/pom.xml b/pom.xml index f30ea67..f48c2ee 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,7 @@ under the License. <log4j.configuration>log4j-test.properties</log4j.configuration> <flink.shaded.version>6.0</flink.shaded.version> <guava.version>18.0</guava.version> - <akka.version>2.4.20</akka.version> + <akka.version>2.5.21</akka.version> <java.version>1.8</java.version> <slf4j.version>1.7.15</slf4j.version> <log4j.version>1.2.17</log4j.version>
