This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ad2f2bccf5f933a85cae9417e1fe0559075ece84 Author: Chesnay Schepler <[email protected]> AuthorDate: Thu Jul 1 13:59:01 2021 +0200 [FLINK-18783] RpcSystem#load accepts Configuration --- .../src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java | 10 ++++++++++ .../org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 2 +- .../java/org/apache/flink/runtime/minicluster/MiniCluster.java | 2 +- .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 2 +- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java index adc31c6..5934c60 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java @@ -78,6 +78,16 @@ public interface RpcSystem extends RpcSystemUtils, AutoCloseable { * @return loaded RpcSystem */ static RpcSystem load() { + return load(new Configuration()); + } + + /** + * Loads the RpcSystem. + * + * @param config Flink configuration + * @return loaded RpcSystem + */ + static RpcSystem load(Configuration config) { final ClassLoader classLoader = RpcSystem.class.getClassLoader(); return ServiceLoader.load(RpcSystem.class, classLoader).iterator().next(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 8b24dfa..07b0a71 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -294,7 +294,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro LOG.info("Initializing cluster services."); synchronized (lock) { - rpcSystem = RpcSystem.load(); + rpcSystem = RpcSystem.load(configuration); commonRpcService = RpcUtils.createRemoteRpcService( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index c9f24d8..d083783 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -276,7 +276,7 @@ public class MiniCluster implements AutoCloseableAsync { try { initializeIOFormatClasses(configuration); - rpcSystem = RpcSystem.load(); + rpcSystem = RpcSystem.load(configuration); LOG.info("Starting Metrics Registry"); metricRegistry = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index aaf7aa1..48eec9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -139,7 +139,7 @@ public class TaskManagerRunner implements FatalErrorHandler { throws Exception { this.configuration = checkNotNull(configuration); - rpcSystem = RpcSystem.load(); + rpcSystem = RpcSystem.load(configuration); timeout = Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
