This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0726c9ea609d37fec192fe7159012d793605f916
Author: Stephan Ewen <[email protected]>
AuthorDate: Wed Mar 31 19:54:00 2021 +0200

    [refactor][runtime] Extend AkkaRpcServiceUtils to support instantiating 
custom AkkaRpcServices.
---
 .../org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java   | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index e31be68..92861fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -41,6 +41,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
 
 import static org.apache.flink.util.NetUtils.isValidClientPort;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -340,6 +341,12 @@ public class AkkaRpcServiceUtils {
         }
 
         public AkkaRpcService createAndStart() throws Exception {
+            return createAndStart(AkkaRpcService::new);
+        }
+
+        public AkkaRpcService createAndStart(
+                BiFunction<ActorSystem, AkkaRpcServiceConfiguration, 
AkkaRpcService> constructor)
+                throws Exception {
             if (actorSystemExecutorConfiguration == null) {
                 actorSystemExecutorConfiguration =
                         
BootstrapTools.ForkJoinExecutorConfiguration.fromConfiguration(
@@ -372,7 +379,7 @@ public class AkkaRpcServiceUtils {
                                 customConfig);
             }
 
-            return new AkkaRpcService(
+            return constructor.apply(
                     actorSystem, 
AkkaRpcServiceConfiguration.fromConfiguration(configuration));
         }
     }

Reply via email to