This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0726c9ea609d37fec192fe7159012d793605f916 Author: Stephan Ewen <[email protected]> AuthorDate: Wed Mar 31 19:54:00 2021 +0200 [refactor][runtime] Extend AkkaRpcServiceUtils to support instantiating custom AkkaRpcServices. --- .../org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index e31be68..92861fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -41,6 +41,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import static org.apache.flink.util.NetUtils.isValidClientPort; import static org.apache.flink.util.Preconditions.checkArgument; @@ -340,6 +341,12 @@ public class AkkaRpcServiceUtils { } public AkkaRpcService createAndStart() throws Exception { + return createAndStart(AkkaRpcService::new); + } + + public AkkaRpcService createAndStart( + BiFunction<ActorSystem, AkkaRpcServiceConfiguration, AkkaRpcService> constructor) + throws Exception { if (actorSystemExecutorConfiguration == null) { actorSystemExecutorConfiguration = BootstrapTools.ForkJoinExecutorConfiguration.fromConfiguration( @@ -372,7 +379,7 @@ public class AkkaRpcServiceUtils { customConfig); } - return new AkkaRpcService( + return constructor.apply( actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration)); } }
