This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ad2f2bccf5f933a85cae9417e1fe0559075ece84
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Jul 1 13:59:01 2021 +0200

    [FLINK-18783] RpcSystem#load accepts Configuration
---
 .../src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java  | 10 ++++++++++
 .../org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java |  2 +-
 .../java/org/apache/flink/runtime/minicluster/MiniCluster.java |  2 +-
 .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java   |  2 +-
 4 files changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
index adc31c6..5934c60 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
@@ -78,6 +78,16 @@ public interface RpcSystem extends RpcSystemUtils, 
AutoCloseable {
      * @return loaded RpcSystem
      */
     static RpcSystem load() {
+        return load(new Configuration());
+    }
+
+    /**
+     * Loads the RpcSystem.
+     *
+     * @param config Flink configuration
+     * @return loaded RpcSystem
+     */
+    static RpcSystem load(Configuration config) {
         final ClassLoader classLoader = RpcSystem.class.getClassLoader();
         return ServiceLoader.load(RpcSystem.class, 
classLoader).iterator().next();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 8b24dfa..07b0a71 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -294,7 +294,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
         LOG.info("Initializing cluster services.");
 
         synchronized (lock) {
-            rpcSystem = RpcSystem.load();
+            rpcSystem = RpcSystem.load(configuration);
 
             commonRpcService =
                     RpcUtils.createRemoteRpcService(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index c9f24d8..d083783 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -276,7 +276,7 @@ public class MiniCluster implements AutoCloseableAsync {
             try {
                 initializeIOFormatClasses(configuration);
 
-                rpcSystem = RpcSystem.load();
+                rpcSystem = RpcSystem.load(configuration);
 
                 LOG.info("Starting Metrics Registry");
                 metricRegistry =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index aaf7aa1..48eec9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -139,7 +139,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
             throws Exception {
         this.configuration = checkNotNull(configuration);
 
-        rpcSystem = RpcSystem.load();
+        rpcSystem = RpcSystem.load(configuration);
 
         timeout = 
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 

Reply via email to