This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c678244a389 [FLINK-32684][rpc] Introduces RpcOptions and deprecates 
AkkaOptions (#24188)
c678244a389 is described below

commit c678244a3890273145a786b9e1bf1a4f96f6dcfd
Author: Matthias Pohl <[email protected]>
AuthorDate: Tue Jan 30 09:57:37 2024 +0100

    [FLINK-32684][rpc] Introduces RpcOptions and deprecates AkkaOptions (#24188)
    
    * Renames AkkaOptions into RpcOptions using the IDE and updates the class' 
JavaDoc
    * Copies RpcOptions to AkkaOptions and deprecates AkkaOptions. 
Additionally, makes each option reference the corresponding RpcOption member.
    * Moves deprecated RPC options out of RpcOptions and updates references
---
 .../apache/flink/client/program/ClientTest.java    |   5 +-
 .../apache/flink/configuration/AkkaOptions.java    | 265 ++++-----------------
 .../flink/configuration/ConfigConstants.java       |  42 ++--
 .../{AkkaOptions.java => RpcOptions.java}          | 108 +--------
 .../flink/configuration/TaskManagerOptions.java    |   6 +-
 .../rpc/pekko/ActorSystemBootstrapTools.java       |  16 +-
 .../runtime/rpc/pekko/PekkoInvocationHandler.java  |   4 +-
 .../rpc/pekko/PekkoRpcServiceConfiguration.java    |   8 +-
 .../runtime/rpc/pekko/PekkoRpcServiceUtils.java    |   6 +-
 .../apache/flink/runtime/rpc/pekko/PekkoUtils.java |  34 +--
 .../rpc/pekko/MessageSerializationTest.java        |   4 +-
 .../PekkoRpcActorOversizedResponseMessageTest.java |   6 +-
 .../flink/runtime/rpc/pekko/PekkoUtilsTest.java    |   4 +-
 .../DefaultSlotPoolServiceSchedulerFactory.java    |   4 +-
 .../runtime/jobmaster/JobMasterConfiguration.java  |   4 +-
 .../minicluster/MiniClusterConfiguration.java      |   6 +-
 .../StandaloneResourceManagerFactory.java          |   4 +-
 .../active/ActiveResourceManager.java              |   4 +-
 .../slotmanager/SlotManagerConfiguration.java      |   4 +-
 .../taskexecutor/TaskManagerConfiguration.java     |   4 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |   6 +-
 .../TaskManagerServicesConfiguration.java          |   4 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java     |   4 +-
 .../TestingDefaultExecutionGraphBuilder.java       |   4 +-
 .../PartialConsumePipelinedResultTest.java         |   4 +-
 .../SlotCountExceedingParallelismTest.java         |   4 +-
 .../TestingResourceManagerFactory.java             |   4 +-
 .../flink/runtime/rpc/RpcConnectionTest.java       |   4 +-
 .../apache/flink/runtime/rpc/RpcSSLAuthITCase.java |   6 +-
 .../TaskManagerRunnerConfigurationTest.java        |   4 +-
 .../MiniClusterResourceConfiguration.java          |   4 +-
 .../runtime/testutils/ZooKeeperTestUtils.java      |   4 +-
 .../flink/streaming/api/datastream/DataStream.java |   4 +-
 .../operators/collect/CollectResultIterator.java   |   4 +-
 .../planner/connectors/CollectDynamicSink.java     |   4 +-
 .../BatchExecDynamicFilteringDataCollector.java    |   4 +-
 .../testframe/testsuites/SinkTestSuiteBase.java    |   4 +-
 .../testframe/testsuites/SourceTestSuiteBase.java  |   4 +-
 .../test/accumulators/AccumulatorLiveITCase.java   |   4 +-
 .../flink/test/cancelling/CancelingTestBase.java   |   6 +-
 .../EventTimeAllWindowCheckpointingITCase.java     |   6 +-
 .../EventTimeWindowCheckpointingITCase.java        |   4 +-
 .../checkpointing/UnalignedCheckpointTestBase.java |   4 +-
 .../flink/test/classloading/ClassLoaderITCase.java |   4 +-
 .../jar/StreamingCustomInputSplitProgram.java      |   4 +-
 .../test/operators/RemoteEnvironmentITCase.java    |   4 +-
 .../recovery/ProcessFailureCancelingITCase.java    |   4 +-
 .../TaskManagerDisconnectOnShutdownITCase.java     |   4 +-
 .../test/recovery/TaskManagerRunnerITCase.java     |   6 +-
 .../test/runtime/ShuffleCompressionITCase.java     |   6 +-
 .../apache/flink/yarn/YARNApplicationITCase.java   |   4 +-
 .../flink/yarn/YARNFileReplicationITCase.java      |   4 +-
 .../java/org/apache/flink/yarn/YARNITCase.java     |   4 +-
 .../apache/flink/yarn/YarnTaskExecutorRunner.java  |   4 +-
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java |   6 +-
 55 files changed, 200 insertions(+), 492 deletions(-)

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 3e591162ba9..7b53a79a147 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -33,13 +33,13 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
 import org.apache.flink.client.testjar.ForbidConfigurationJob;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.PipelineExecutor;
@@ -101,8 +101,7 @@ class ClientTest {
         config = new Configuration();
         config.set(JobManagerOptions.ADDRESS, "localhost");
 
-        config.set(
-                AkkaOptions.ASK_TIMEOUT_DURATION, 
AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
+        config.set(RpcOptions.ASK_TIMEOUT_DURATION, 
RpcOptions.ASK_TIMEOUT_DURATION.defaultValue());
     }
 
     private Configuration fromPackagedProgram(
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 5f3cccc495b..236ec3e3511 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -18,154 +18,78 @@
 
 package org.apache.flink.configuration;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.util.TimeUtils;
 
 import java.time.Duration;
 
 import static org.apache.flink.configuration.description.LinkElement.link;
-import static org.apache.flink.configuration.description.TextElement.code;
 
-/** Akka configuration options. */
+/**
+ * RPC configuration options.
+ *
+ * @deprecated Use {@link RpcOptions} instead.
+ */
 @PublicEvolving
+@Deprecated // since 1.19.0
 public class AkkaOptions {
 
-    @Internal
-    @Documentation.ExcludeFromDocumentation("Internal use only")
-    public static final ConfigOption<Boolean> 
FORCE_RPC_INVOCATION_SERIALIZATION =
-            ConfigOptions.key("pekko.rpc.force-invocation-serialization")
-                    .booleanType()
-                    .defaultValue(false)
-                    
.withDeprecatedKeys("akka.rpc.force-invocation-serialization")
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Forces the serialization of all 
RPC invocations (that are not explicitly annotated with %s)."
-                                                    + "This option can be used 
to find serialization issues in the argument/response types without relying 
requiring HA setups."
-                                                    + "This option should not 
be enabled in production.",
-                                            
code("org.apache.flink.runtime.rpc.Local"))
-                                    .build());
-
     public static boolean 
isForceRpcInvocationSerializationEnabled(Configuration config) {
-        return config.getOptional(FORCE_RPC_INVOCATION_SERIALIZATION)
-                .orElse(
-                        FORCE_RPC_INVOCATION_SERIALIZATION.defaultValue()
-                                || System.getProperties()
-                                        
.containsKey(FORCE_RPC_INVOCATION_SERIALIZATION.key()));
+        return RpcOptions.isForceRpcInvocationSerializationEnabled(config);
     }
 
     /** Flag whether to capture call stacks for RPC ask calls. */
     public static final ConfigOption<Boolean> CAPTURE_ASK_CALLSTACK =
-            ConfigOptions.key("pekko.ask.callstack")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDeprecatedKeys("akka.ask.callstack")
-                    .withDescription(
-                            "If true, call stack for asynchronous asks are 
captured. That way, when an ask fails "
-                                    + "(for example times out), you get a 
proper exception, describing to the original method call and "
-                                    + "call site. Note that in case of having 
millions of concurrent RPC calls, this may add to the "
-                                    + "memory footprint.");
+            RpcOptions.CAPTURE_ASK_CALLSTACK;
 
     /** Timeout for Pekko ask calls. */
     public static final ConfigOption<Duration> ASK_TIMEOUT_DURATION =
-            ConfigOptions.key("pekko.ask.timeout")
-                    .durationType()
-                    .defaultValue(Duration.ofSeconds(10))
-                    .withDeprecatedKeys("akka.ask.timeout")
-                    .withDescription(
-                            "Timeout used for all futures and blocking Pekko 
calls. If Flink fails due to timeouts then you"
-                                    + " should try to increase this value. 
Timeouts can be caused by slow machines or a congested network. The"
-                                    + " timeout value requires a time-unit 
specifier (ms/s/min/h/d).");
+            RpcOptions.ASK_TIMEOUT_DURATION;
 
-    /** @deprecated Use {@link #ASK_TIMEOUT_DURATION} */
+    /** @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} */
     @Deprecated
     public static final ConfigOption<String> ASK_TIMEOUT =
-            ConfigOptions.key(ASK_TIMEOUT_DURATION.key())
+            ConfigOptions.key(RpcOptions.ASK_TIMEOUT_DURATION.key())
                     .stringType()
                     .defaultValue(
-                            
TimeUtils.formatWithHighestUnit(ASK_TIMEOUT_DURATION.defaultValue()))
-                    .withDescription(ASK_TIMEOUT_DURATION.description());
+                            TimeUtils.formatWithHighestUnit(
+                                    
RpcOptions.ASK_TIMEOUT_DURATION.defaultValue()))
+                    
.withDescription(RpcOptions.ASK_TIMEOUT_DURATION.description());
 
     /** The Pekko tcp connection timeout. */
-    public static final ConfigOption<String> TCP_TIMEOUT =
-            ConfigOptions.key("pekko.tcp.timeout")
-                    .stringType()
-                    .defaultValue("20 s")
-                    .withDeprecatedKeys("akka.tcp.timeout")
-                    .withDescription(
-                            "Timeout for all outbound connections. If you 
should experience problems with connecting to a"
-                                    + " TaskManager due to a slow network, you 
should increase this value.");
+    public static final ConfigOption<String> TCP_TIMEOUT = 
RpcOptions.TCP_TIMEOUT;
 
     /** Timeout for the startup of the actor system. */
-    public static final ConfigOption<String> STARTUP_TIMEOUT =
-            ConfigOptions.key("pekko.startup-timeout")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDeprecatedKeys("akka.startup-timeout")
-                    .withDescription(
-                            "Timeout after which the startup of a remote 
component is considered being failed.");
+    public static final ConfigOption<String> STARTUP_TIMEOUT = 
RpcOptions.STARTUP_TIMEOUT;
 
     /** Override SSL support for the Pekko transport. */
-    public static final ConfigOption<Boolean> SSL_ENABLED =
-            ConfigOptions.key("pekko.ssl.enabled")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDeprecatedKeys("akka.ssl.enabled")
-                    .withDescription(
-                            "Turns on SSL for Pekko’s remote communication. 
This is applicable only when the global ssl flag"
-                                    + " security.ssl.enabled is set to true.");
+    public static final ConfigOption<Boolean> SSL_ENABLED = 
RpcOptions.SSL_ENABLED;
 
     /** Maximum framesize of Pekko messages. */
-    public static final ConfigOption<String> FRAMESIZE =
-            ConfigOptions.key("pekko.framesize")
-                    .stringType()
-                    .defaultValue("10485760b")
-                    .withDeprecatedKeys("akka.framesize")
-                    .withDescription(
-                            "Maximum size of messages which are sent between 
the JobManager and the TaskManagers. If Flink"
-                                    + " fails because messages exceed this 
limit, then you should increase it. The message size requires a"
-                                    + " size-unit specifier.");
+    public static final ConfigOption<String> FRAMESIZE = RpcOptions.FRAMESIZE;
 
     /** Maximum number of messages until another actor is executed by the same 
thread. */
     public static final ConfigOption<Integer> DISPATCHER_THROUGHPUT =
-            ConfigOptions.key("pekko.throughput")
-                    .intType()
-                    .defaultValue(15)
-                    .withDeprecatedKeys("akka.throughput")
-                    .withDescription(
-                            "Number of messages that are processed in a batch 
before returning the thread to the pool. Low"
-                                    + " values denote a fair scheduling 
whereas high values can increase the performance at the cost of unfairness.");
+            RpcOptions.DISPATCHER_THROUGHPUT;
 
     /** Log lifecycle events. */
     public static final ConfigOption<Boolean> LOG_LIFECYCLE_EVENTS =
-            ConfigOptions.key("pekko.log.lifecycle.events")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDeprecatedKeys("akka.log.lifecycle.events")
-                    .withDescription(
-                            "Turns on the Pekko’s remote logging of events. 
Set this value to 'true' in case of debugging.");
+            RpcOptions.LOG_LIFECYCLE_EVENTS;
 
     /** Timeout for all blocking calls that look up remote actors. */
     public static final ConfigOption<Duration> LOOKUP_TIMEOUT_DURATION =
-            ConfigOptions.key("pekko.lookup.timeout")
-                    .durationType()
-                    .defaultValue(Duration.ofSeconds(10))
-                    .withDeprecatedKeys("akka.lookup.timeout")
-                    .withDescription(
-                            "Timeout used for the lookup of the JobManager. 
The timeout value has to contain a time-unit"
-                                    + " specifier (ms/s/min/h/d).");
+            RpcOptions.LOOKUP_TIMEOUT_DURATION;
 
-    /** @deprecated use {@link #LOOKUP_TIMEOUT_DURATION} */
+    /** @deprecated use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} */
     @Deprecated
     public static final ConfigOption<String> LOOKUP_TIMEOUT =
-            ConfigOptions.key(LOOKUP_TIMEOUT_DURATION.key())
+            ConfigOptions.key(RpcOptions.LOOKUP_TIMEOUT_DURATION.key())
                     .stringType()
                     .defaultValue(
-                            
TimeUtils.formatWithHighestUnit(LOOKUP_TIMEOUT_DURATION.defaultValue()))
-                    .withDescription(LOOKUP_TIMEOUT_DURATION.description());
+                            TimeUtils.formatWithHighestUnit(
+                                    
RpcOptions.LOOKUP_TIMEOUT_DURATION.defaultValue()))
+                    
.withDescription(RpcOptions.LOOKUP_TIMEOUT_DURATION.description());
 
     /**
      * Timeout for all blocking calls on the client side.
@@ -183,167 +107,58 @@ public class AkkaOptions {
 
     /** Exit JVM on fatal Pekko errors. */
     public static final ConfigOption<Boolean> JVM_EXIT_ON_FATAL_ERROR =
-            ConfigOptions.key("pekko.jvm-exit-on-fatal-error")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDeprecatedKeys("akka.jvm-exit-on-fatal-error")
-                    .withDescription("Exit JVM on fatal Pekko errors.");
+            RpcOptions.JVM_EXIT_ON_FATAL_ERROR;
 
     /** Milliseconds a gate should be closed for after a remote connection was 
disconnected. */
-    public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR =
-            ConfigOptions.key("pekko.retry-gate-closed-for")
-                    .longType()
-                    .defaultValue(50L)
-                    .withDeprecatedKeys("akka.retry-gate-closed-for")
-                    .withDescription(
-                            "Milliseconds a gate should be closed for after a 
remote connection was disconnected.");
+    public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR = 
RpcOptions.RETRY_GATE_CLOSED_FOR;
 
     // ==================================================
     // Configurations for fork-join-executor.
     // ==================================================
 
     public static final ConfigOption<Double> 
FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR =
-            ConfigOptions.key("pekko.fork-join-executor.parallelism-factor")
-                    .doubleType()
-                    .defaultValue(2.0)
-                    
.withDeprecatedKeys("akka.fork-join-executor.parallelism-factor")
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "The parallelism factor is used to 
determine thread pool size using the"
-                                                    + " following formula: 
ceil(available processors * factor). Resulting size"
-                                                    + " is then bounded by the 
parallelism-min and parallelism-max values.")
-                                    .build());
+            RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR;
 
     public static final ConfigOption<Integer> 
FORK_JOIN_EXECUTOR_PARALLELISM_MIN =
-            ConfigOptions.key("pekko.fork-join-executor.parallelism-min")
-                    .intType()
-                    .defaultValue(8)
-                    
.withDeprecatedKeys("akka.fork-join-executor.parallelism-min")
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Min number of threads to cap 
factor-based parallelism number to.")
-                                    .build());
+            RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN;
 
     public static final ConfigOption<Integer> 
FORK_JOIN_EXECUTOR_PARALLELISM_MAX =
-            ConfigOptions.key("pekko.fork-join-executor.parallelism-max")
-                    .intType()
-                    .defaultValue(64)
-                    
.withDeprecatedKeys("akka.fork-join-executor.parallelism-max")
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Max number of threads to cap 
factor-based parallelism number to.")
-                                    .build());
+            RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX;
 
     public static final ConfigOption<Double> 
REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR =
-            
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-factor")
-                    .doubleType()
-                    .defaultValue(2.0)
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "The parallelism factor is used to 
determine thread pool size using the"
-                                                    + " following formula: 
ceil(available processors * factor). Resulting size"
-                                                    + " is then bounded by the 
parallelism-min and parallelism-max values.")
-                                    .build());
+            RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR;
 
     public static final ConfigOption<Integer> 
REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN =
-            
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-min")
-                    .intType()
-                    .defaultValue(8)
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Min number of threads to cap 
factor-based parallelism number to.")
-                                    .build());
+            RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN;
 
     public static final ConfigOption<Integer> 
REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX =
-            
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-max")
-                    .intType()
-                    .defaultValue(16)
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Max number of threads to cap 
factor-based parallelism number to.")
-                                    .build());
+            RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX;
 
     // ==================================================
     // Configurations for client-socket-work-pool.
     // ==================================================
 
     public static final ConfigOption<Integer> 
CLIENT_SOCKET_WORKER_POOL_SIZE_MIN =
-            ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-min")
-                    .intType()
-                    .defaultValue(1)
-                    
.withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-min")
-                    .withDescription(
-                            Description.builder()
-                                    .text("Min number of threads to cap 
factor-based number to.")
-                                    .build());
+            RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN;
 
     public static final ConfigOption<Integer> 
CLIENT_SOCKET_WORKER_POOL_SIZE_MAX =
-            ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-max")
-                    .intType()
-                    .defaultValue(2)
-                    
.withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-max")
-                    .withDescription(
-                            Description.builder()
-                                    .text("Max number of threads to cap 
factor-based number to.")
-                                    .build());
+            RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX;
 
     public static final ConfigOption<Double> 
CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR =
-            
ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-factor")
-                    .doubleType()
-                    .defaultValue(1.0)
-                    
.withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-factor")
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "The pool size factor is used to 
determine thread pool size"
-                                                    + " using the following 
formula: ceil(available processors * factor)."
-                                                    + " Resulting size is then 
bounded by the pool-size-min and"
-                                                    + " pool-size-max values.")
-                                    .build());
+            RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR;
 
     // ==================================================
     // Configurations for server-socket-work-pool.
     // ==================================================
 
     public static final ConfigOption<Integer> 
SERVER_SOCKET_WORKER_POOL_SIZE_MIN =
-            ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-min")
-                    .intType()
-                    .defaultValue(1)
-                    
.withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-min")
-                    .withDescription(
-                            Description.builder()
-                                    .text("Min number of threads to cap 
factor-based number to.")
-                                    .build());
+            RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN;
 
     public static final ConfigOption<Integer> 
SERVER_SOCKET_WORKER_POOL_SIZE_MAX =
-            ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-max")
-                    .intType()
-                    .defaultValue(2)
-                    
.withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-max")
-                    .withDescription(
-                            Description.builder()
-                                    .text("Max number of threads to cap 
factor-based number to.")
-                                    .build());
+            RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX;
 
     public static final ConfigOption<Double> 
SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR =
-            
ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-factor")
-                    .doubleType()
-                    .defaultValue(1.0)
-                    
.withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-factor")
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "The pool size factor is used to 
determine thread pool size"
-                                                    + " using the following 
formula: ceil(available processors * factor)."
-                                                    + " Resulting size is then 
bounded by the pool-size-min and"
-                                                    + " pool-size-max values.")
-                                    .build());
+            RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR;
 
     // ==================================================
     // Deprecated options
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b7751abd07b..58a33a6060a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -764,14 +764,14 @@ public final class ConfigConstants {
     /**
      * Timeout for the startup of the actor system.
      *
-     * @deprecated Use {@link AkkaOptions#STARTUP_TIMEOUT} instead.
+     * @deprecated Use {@link RpcOptions#STARTUP_TIMEOUT} instead.
      */
     @Deprecated public static final String AKKA_STARTUP_TIMEOUT = 
"akka.startup-timeout";
 
     /**
      * Heartbeat interval of the transport failure detector.
      *
-     * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} 
instead.
+     * @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead.
      */
     @Deprecated
     public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL =
@@ -780,7 +780,7 @@ public final class ConfigConstants {
     /**
      * Allowed heartbeat pause for the transport failure detector.
      *
-     * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
+     * @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
      */
     @Deprecated
     public static final String AKKA_TRANSPORT_HEARTBEAT_PAUSE = 
"akka.transport.heartbeat.pause";
@@ -788,7 +788,7 @@ public final class ConfigConstants {
     /**
      * Detection threshold of transport failure detector.
      *
-     * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
+     * @deprecated Use {@link RpcOptions#TRANSPORT_THRESHOLD} instead.
      */
     @Deprecated public static final String AKKA_TRANSPORT_THRESHOLD = 
"akka.transport.threshold";
 
@@ -818,49 +818,49 @@ public final class ConfigConstants {
     /**
      * Akka TCP timeout.
      *
-     * @deprecated Use {@link AkkaOptions#TCP_TIMEOUT} instead.
+     * @deprecated Use {@link RpcOptions#TCP_TIMEOUT} instead.
      */
     @Deprecated public static final String AKKA_TCP_TIMEOUT = 
"akka.tcp.timeout";
 
     /**
      * Override SSL support for the Akka transport.
      *
-     * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
+     * @deprecated Use {@link RpcOptions#SSL_ENABLED} instead.
      */
     @Deprecated public static final String AKKA_SSL_ENABLED = 
"akka.ssl.enabled";
 
     /**
      * Maximum framesize of akka messages.
      *
-     * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
+     * @deprecated Use {@link RpcOptions#FRAMESIZE} instead.
      */
     @Deprecated public static final String AKKA_FRAMESIZE = "akka.framesize";
 
     /**
      * Maximum number of messages until another actor is executed by the same 
thread.
      *
-     * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
+     * @deprecated Use {@link RpcOptions#DISPATCHER_THROUGHPUT} instead.
      */
     @Deprecated public static final String AKKA_DISPATCHER_THROUGHPUT = 
"akka.throughput";
 
     /**
      * Log lifecycle events.
      *
-     * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
+     * @deprecated Use {@link RpcOptions#LOG_LIFECYCLE_EVENTS} instead.
      */
     @Deprecated public static final String AKKA_LOG_LIFECYCLE_EVENTS = 
"akka.log.lifecycle.events";
 
     /**
      * Timeout for all blocking calls on the cluster side.
      *
-     * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
+     * @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} instead.
      */
     @Deprecated public static final String AKKA_ASK_TIMEOUT = 
"akka.ask.timeout";
 
     /**
      * Timeout for all blocking calls that look up remote actors.
      *
-     * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
+     * @deprecated Use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} instead.
      */
     @Deprecated public static final String AKKA_LOOKUP_TIMEOUT = 
"akka.lookup.timeout";
 
@@ -874,7 +874,7 @@ public final class ConfigConstants {
     /**
      * Exit JVM on fatal Akka errors.
      *
-     * @deprecated Use {@link AkkaOptions#JVM_EXIT_ON_FATAL_ERROR} instead.
+     * @deprecated Use {@link RpcOptions#JVM_EXIT_ON_FATAL_ERROR} instead.
      */
     @Deprecated
     public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = 
"akka.jvm-exit-on-fatal-error";
@@ -1547,37 +1547,37 @@ public final class ConfigConstants {
 
     // ------------------------------ Akka Values 
------------------------------
 
-    /** @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} 
instead. */
+    /** @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_INTERVAL} 
instead. */
     @Deprecated public static final String 
DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
 
-    /** @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. 
*/
+    /** @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. 
*/
     @Deprecated public static final String 
DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";
 
-    /** @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead. */
+    /** @deprecated Use {@link RpcOptions#TRANSPORT_THRESHOLD} instead. */
     @Deprecated public static final double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 
300.0;
 
     /** @deprecated This default value is no longer used and has no effect on 
Flink. */
     @Deprecated public static final double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
 
-    /** @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead. */
+    /** @deprecated Use {@link RpcOptions#DISPATCHER_THROUGHPUT} instead. */
     @Deprecated public static final int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 
15;
 
-    /** @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead. */
+    /** @deprecated Use {@link RpcOptions#LOG_LIFECYCLE_EVENTS} instead. */
     @Deprecated public static final boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS 
= false;
 
-    /** @deprecated Use {@link AkkaOptions#FRAMESIZE} instead. */
+    /** @deprecated Use {@link RpcOptions#FRAMESIZE} instead. */
     @Deprecated public static final String DEFAULT_AKKA_FRAMESIZE = 
"10485760b";
 
-    /** @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead. */
+    /** @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} instead. */
     @Deprecated public static final String DEFAULT_AKKA_ASK_TIMEOUT = "10 s";
 
-    /** @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead. */
+    /** @deprecated Use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} instead. */
     @Deprecated public static final String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 
s";
 
     /** @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead. */
     @Deprecated public static final String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 
s";
 
-    /** @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead. */
+    /** @deprecated Use {@link RpcOptions#SSL_ENABLED} instead. */
     @Deprecated public static final boolean DEFAULT_AKKA_SSL_ENABLED = true;
 
     // ----------------------------- SSL Values 
--------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RpcOptions.java
similarity index 75%
copy from 
flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
copy to flink-core/src/main/java/org/apache/flink/configuration/RpcOptions.java
index 5f3cccc495b..a2196634788 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RpcOptions.java
@@ -22,16 +22,14 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
-import org.apache.flink.util.TimeUtils;
 
 import java.time.Duration;
 
-import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.code;
 
-/** Akka configuration options. */
+/** RPC configuration options. */
 @PublicEvolving
-public class AkkaOptions {
+public class RpcOptions {
 
     @Internal
     @Documentation.ExcludeFromDocumentation("Internal use only")
@@ -80,15 +78,6 @@ public class AkkaOptions {
                                     + " should try to increase this value. 
Timeouts can be caused by slow machines or a congested network. The"
                                     + " timeout value requires a time-unit 
specifier (ms/s/min/h/d).");
 
-    /** @deprecated Use {@link #ASK_TIMEOUT_DURATION} */
-    @Deprecated
-    public static final ConfigOption<String> ASK_TIMEOUT =
-            ConfigOptions.key(ASK_TIMEOUT_DURATION.key())
-                    .stringType()
-                    .defaultValue(
-                            
TimeUtils.formatWithHighestUnit(ASK_TIMEOUT_DURATION.defaultValue()))
-                    .withDescription(ASK_TIMEOUT_DURATION.description());
-
     /** The Pekko tcp connection timeout. */
     public static final ConfigOption<String> TCP_TIMEOUT =
             ConfigOptions.key("pekko.tcp.timeout")
@@ -158,29 +147,6 @@ public class AkkaOptions {
                             "Timeout used for the lookup of the JobManager. 
The timeout value has to contain a time-unit"
                                     + " specifier (ms/s/min/h/d).");
 
-    /** @deprecated use {@link #LOOKUP_TIMEOUT_DURATION} */
-    @Deprecated
-    public static final ConfigOption<String> LOOKUP_TIMEOUT =
-            ConfigOptions.key(LOOKUP_TIMEOUT_DURATION.key())
-                    .stringType()
-                    .defaultValue(
-                            
TimeUtils.formatWithHighestUnit(LOOKUP_TIMEOUT_DURATION.defaultValue()))
-                    .withDescription(LOOKUP_TIMEOUT_DURATION.description());
-
-    /**
-     * Timeout for all blocking calls on the client side.
-     *
-     * @deprecated Use the {@code ClientOptions.CLIENT_TIMEOUT} instead.
-     */
-    @Deprecated
-    public static final ConfigOption<String> CLIENT_TIMEOUT =
-            ConfigOptions.key("akka.client.timeout")
-                    .stringType()
-                    .defaultValue("60 s")
-                    .withDescription(
-                            "DEPRECATED: Use the \"client.timeout\" instead."
-                                    + " Timeout for all blocking calls on the 
client side.");
-
     /** Exit JVM on fatal Pekko errors. */
     public static final ConfigOption<Boolean> JVM_EXIT_ON_FATAL_ERROR =
             ConfigOptions.key("pekko.jvm-exit-on-fatal-error")
@@ -344,74 +310,4 @@ public class AkkaOptions {
                                                     + " Resulting size is then 
bounded by the pool-size-min and"
                                                     + " pool-size-max values.")
                                     .build());
-
-    // ==================================================
-    // Deprecated options
-    // ==================================================
-
-    /**
-     * The Akka death watch heartbeat interval.
-     *
-     * @deprecated Don't use this option anymore. It has no effect on Flink.
-     */
-    @Deprecated
-    public static final ConfigOption<String> WATCH_HEARTBEAT_INTERVAL =
-            ConfigOptions.key("akka.watch.heartbeat.interval")
-                    .stringType()
-                    .defaultValue(ASK_TIMEOUT.defaultValue())
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Heartbeat interval for Akka’s 
DeathWatch mechanism to detect dead TaskManagers. If"
-                                                    + " TaskManagers are 
wrongly marked dead because of lost or delayed heartbeat messages, then you"
-                                                    + " should decrease this 
value or increase akka.watch.heartbeat.pause. A thorough description of"
-                                                    + " Akka’s DeathWatch can 
be found %s",
-                                            link(
-                                                    
"https://pekko.apache.org/docs/pekko/current/remoting-artery.html#failure-detector";,
-                                                    "here"))
-                                    .build());
-
-    /**
-     * The maximum acceptable Akka death watch heartbeat pause.
-     *
-     * @deprecated Don't use this option anymore. It has no effect on Flink.
-     */
-    @Deprecated
-    public static final ConfigOption<String> WATCH_HEARTBEAT_PAUSE =
-            ConfigOptions.key("akka.watch.heartbeat.pause")
-                    .stringType()
-                    .defaultValue("60 s")
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Acceptable heartbeat pause for 
Akka’s DeathWatch mechanism. A low value does not allow an"
-                                                    + " irregular heartbeat. 
If TaskManagers are wrongly marked dead because of lost or delayed"
-                                                    + " heartbeat messages, 
then you should increase this value or decrease akka.watch.heartbeat.interval."
-                                                    + " Higher value increases 
the time to detect a dead TaskManager. A thorough description of Akka’s"
-                                                    + " DeathWatch can be 
found %s",
-                                            link(
-                                                    
"https://pekko.apache.org/docs/pekko/current/remoting-artery.html#failure-detector";,
-                                                    "here"))
-                                    .build());
-
-    /**
-     * Detection threshold for the phi accrual watch failure detector.
-     *
-     * @deprecated Don't use this option anymore. It has no effect on Flink.
-     */
-    @Deprecated
-    public static final ConfigOption<Integer> WATCH_THRESHOLD =
-            ConfigOptions.key("akka.watch.threshold")
-                    .intType()
-                    .defaultValue(12)
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Threshold for the DeathWatch 
failure detector. A low value is prone to false positives whereas"
-                                                    + " a high value increases 
the time to detect a dead TaskManager. A thorough description of Akka’s"
-                                                    + " DeathWatch can be 
found %s",
-                                            link(
-                                                    
"https://pekko.apache.org/docs/pekko/current/remoting-artery.html#failure-detector";,
-                                                    "here"))
-                                    .build());
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index f6c64c2f6db..aaeee7e74b4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -266,15 +266,15 @@ public class TaskManagerOptions {
     public static final ConfigOption<Duration> SLOT_TIMEOUT =
             key("taskmanager.slot.timeout")
                     .durationType()
-                    
.defaultValue(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue())
-                    .withFallbackKeys(AkkaOptions.ASK_TIMEOUT_DURATION.key())
+                    
.defaultValue(RpcOptions.ASK_TIMEOUT_DURATION.defaultValue())
+                    .withFallbackKeys(RpcOptions.ASK_TIMEOUT_DURATION.key())
                     .withDescription(
                             Description.builder()
                                     .text(
                                             "Timeout used for identifying 
inactive slots. The TaskManager will free the slot if it does not become active 
"
                                                     + "within the given amount 
of time. Inactive slots can be caused by an out-dated slot request. If no "
                                                     + "value is configured, 
then it will fall back to %s.",
-                                            
code(AkkaOptions.ASK_TIMEOUT_DURATION.key()))
+                                            
code(RpcOptions.ASK_TIMEOUT_DURATION.key()))
                                     .build());
 
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
index 5149919ab83..86ccdca9226 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
@@ -18,8 +18,8 @@
 package org.apache.flink.runtime.rpc.pekko;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.rpc.RpcSystem;
 import org.apache.flink.util.NetUtils;
 
@@ -264,11 +264,9 @@ public class ActorSystemBootstrapTools {
     public static RpcSystem.ForkJoinExecutorConfiguration 
getForkJoinExecutorConfiguration(
             final Configuration configuration) {
         final double parallelismFactor =
-                
configuration.get(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
-        final int minParallelism =
-                
configuration.get(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
-        final int maxParallelism =
-                
configuration.get(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
+                
configuration.get(RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
+        final int minParallelism = 
configuration.get(RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
+        final int maxParallelism = 
configuration.get(RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
 
         return new RpcSystem.ForkJoinExecutorConfiguration(
                 parallelismFactor, minParallelism, maxParallelism);
@@ -277,11 +275,11 @@ public class ActorSystemBootstrapTools {
     public static RpcSystem.ForkJoinExecutorConfiguration 
getRemoteForkJoinExecutorConfiguration(
             final Configuration configuration) {
         final double parallelismFactor =
-                
configuration.get(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
+                
configuration.get(RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
         final int minParallelism =
-                
configuration.get(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
+                
configuration.get(RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
         final int maxParallelism =
-                
configuration.get(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
+                
configuration.get(RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
 
         return new RpcSystem.ForkJoinExecutorConfiguration(
                 parallelismFactor, minParallelism, maxParallelism);
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java
index bc117df44b7..496304dc6aa 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.rpc.pekko;
 
-import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.Local;
@@ -400,7 +400,7 @@ class PekkoInvocationHandler implements InvocationHandler, 
PekkoBasedEndpoint, R
                                             + "more time for responding, due 
to problems like slow machines or network jitters. In that case, you can try to 
increase %s.",
                                     rpcInvocation,
                                     recipient,
-                                    AkkaOptions.ASK_TIMEOUT_DURATION.key()));
+                                    RpcOptions.ASK_TIMEOUT_DURATION.key()));
         }
 
         newException.initCause(exception);
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java
index e52c277b37b..ee19ed005df 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.runtime.rpc.pekko;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 
 import javax.annotation.Nonnull;
 
@@ -77,14 +77,14 @@ public class PekkoRpcServiceConfiguration {
     }
 
     public static PekkoRpcServiceConfiguration fromConfiguration(Configuration 
configuration) {
-        final Duration timeout = 
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
+        final Duration timeout = 
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION);
 
         final long maximumFramesize = 
PekkoRpcServiceUtils.extractMaximumFramesize(configuration);
 
-        final boolean captureAskCallStacks = 
configuration.get(AkkaOptions.CAPTURE_ASK_CALLSTACK);
+        final boolean captureAskCallStacks = 
configuration.get(RpcOptions.CAPTURE_ASK_CALLSTACK);
 
         final boolean forceRpcInvocationSerialization =
-                
AkkaOptions.isForceRpcInvocationSerializationEnabled(configuration);
+                
RpcOptions.isForceRpcInvocationSerializationEnabled(configuration);
 
         return new PekkoRpcServiceConfiguration(
                 configuration,
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java
index 4de9914aa1c..c3eb3925e96 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rpc.pekko;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.rpc.AddressResolution;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -130,7 +130,7 @@ public class PekkoRpcServiceUtils {
         checkNotNull(config, "config is null");
 
         final boolean sslEnabled =
-                config.get(AkkaOptions.SSL_ENABLED) && 
SecurityOptions.isInternalSSLEnabled(config);
+                config.get(RpcOptions.SSL_ENABLED) && 
SecurityOptions.isInternalSSLEnabled(config);
 
         return getRpcUrl(
                 hostname,
@@ -232,7 +232,7 @@ public class PekkoRpcServiceUtils {
     // ------------------------------------------------------------------------
 
     public static long extractMaximumFramesize(Configuration configuration) {
-        String maxFrameSizeStr = configuration.get(AkkaOptions.FRAMESIZE);
+        String maxFrameSizeStr = configuration.get(RpcOptions.FRAMESIZE);
         String configStr = String.format(SIMPLE_CONFIG_TEMPLATE, 
maxFrameSizeStr);
         Config config = ConfigFactory.parseString(configStr);
         return config.getBytes(MAXIMUM_FRAME_SIZE_PATH);
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
index 660235e125e..095ae648726 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
@@ -18,8 +18,8 @@
 package org.apache.flink.runtime.rpc.pekko;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
 import org.apache.flink.runtime.rpc.RpcSystem;
@@ -70,11 +70,11 @@ class PekkoUtils {
      * @return Flink's basic Pekko config
      */
     private static Config getBasicConfig(Configuration configuration) {
-        final int throughput = 
configuration.get(AkkaOptions.DISPATCHER_THROUGHPUT);
+        final int throughput = 
configuration.get(RpcOptions.DISPATCHER_THROUGHPUT);
         final String jvmExitOnFatalError =
-                
booleanToOnOrOff(configuration.get(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR));
+                
booleanToOnOrOff(configuration.get(RpcOptions.JVM_EXIT_ON_FATAL_ERROR));
         final String logLifecycleEvents =
-                
booleanToOnOrOff(configuration.get(AkkaOptions.LOG_LIFECYCLE_EVENTS));
+                
booleanToOnOrOff(configuration.get(RpcOptions.LOG_LIFECYCLE_EVENTS));
         final String supervisorStrategy = 
EscalatingSupervisorStrategy.class.getCanonicalName();
 
         return new ConfigBuilder()
@@ -206,39 +206,39 @@ class PekkoUtils {
 
     private static void addBaseRemoteConfig(
             ConfigBuilder configBuilder, Configuration configuration, int 
port, int externalPort) {
-        final Duration askTimeout = 
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
+        final Duration askTimeout = 
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION);
 
         final String startupTimeout =
                 TimeUtils.getStringInMillis(
                         TimeUtils.parseDuration(
                                 configuration.get(
-                                        AkkaOptions.STARTUP_TIMEOUT,
+                                        RpcOptions.STARTUP_TIMEOUT,
                                         TimeUtils.getStringInMillis(
                                                 
askTimeout.multipliedBy(10L)))));
 
         final String tcpTimeout =
                 TimeUtils.getStringInMillis(
-                        
TimeUtils.parseDuration(configuration.get(AkkaOptions.TCP_TIMEOUT)));
+                        
TimeUtils.parseDuration(configuration.get(RpcOptions.TCP_TIMEOUT)));
 
-        final String framesize = configuration.get(AkkaOptions.FRAMESIZE);
+        final String framesize = configuration.get(RpcOptions.FRAMESIZE);
 
         final int clientSocketWorkerPoolPoolSizeMin =
-                
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
+                
configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
         final int clientSocketWorkerPoolPoolSizeMax =
-                
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
+                
configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
         final double clientSocketWorkerPoolPoolSizeFactor =
-                
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
+                
configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
         final int serverSocketWorkerPoolPoolSizeMin =
-                
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
+                
configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
         final int serverSocketWorkerPoolPoolSizeMax =
-                
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
+                
configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
         final double serverSocketWorkerPoolPoolSizeFactor =
-                
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
+                
configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
 
         final String logLifecycleEvents =
-                
booleanToOnOrOff(configuration.get(AkkaOptions.LOG_LIFECYCLE_EVENTS));
+                
booleanToOnOrOff(configuration.get(RpcOptions.LOG_LIFECYCLE_EVENTS));
 
-        final long retryGateClosedFor = 
configuration.get(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+        final long retryGateClosedFor = 
configuration.get(RpcOptions.RETRY_GATE_CLOSED_FOR);
 
         configBuilder
                 .add("pekko {")
@@ -312,7 +312,7 @@ class PekkoUtils {
             ConfigBuilder configBuilder, Configuration configuration) {
 
         final boolean enableSSLConfig =
-                configuration.get(AkkaOptions.SSL_ENABLED)
+                configuration.get(RpcOptions.SSL_ENABLED)
                         && SecurityOptions.isInternalSSLEnabled(configuration);
 
         final String enableSSL = booleanToOnOrOff(enableSSLConfig);
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java
index 7ad851f0322..22aeb2f31dd 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.rpc.pekko;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.rpc.Local;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -51,7 +51,7 @@ class MessageSerializationTest {
     @BeforeAll
     static void setup() throws Exception {
         Configuration configuration = new Configuration();
-        configuration.set(AkkaOptions.FRAMESIZE, maxFrameSize + "b");
+        configuration.set(RpcOptions.FRAMESIZE, maxFrameSize + "b");
 
         rpcService1 =
                 PekkoRpcServiceUtils.remoteServiceBuilder(configuration, 
"localhost", 0)
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java
index 9170ec784f6..ae3a9828e25 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.rpc.pekko;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -57,8 +57,8 @@ class PekkoRpcActorOversizedResponseMessageTest {
     static void setupClass() throws Exception {
         final Configuration configuration = new Configuration();
         // some tests explicitly test local communication where no 
serialization should occur
-        configuration.set(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION, 
false);
-        configuration.set(AkkaOptions.FRAMESIZE, FRAMESIZE + " b");
+        configuration.set(RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION, 
false);
+        configuration.set(RpcOptions.FRAMESIZE, FRAMESIZE + " b");
 
         rpcService1 =
                 PekkoRpcServiceUtils.remoteServiceBuilder(configuration, 
"localhost", 0)
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
index 78f300cf0c9..553b2cd5589 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.runtime.rpc.pekko;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.rpc.AddressResolution;
 import org.apache.flink.runtime.rpc.RpcSystem;
@@ -225,7 +225,7 @@ class PekkoUtilsTest {
     @Test
     void getConfigDefaultsStartupTimeoutTo10TimesOfAskTimeout() {
         final Configuration configuration = new Configuration();
-        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMillis(100));
+        configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMillis(100));
 
         final Config config =
                 PekkoUtils.getConfig(configuration, new 
HostAndPort("localhost", 31337));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 7b1a2281d8d..4514f890510 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -152,7 +152,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory
             Configuration configuration, JobType jobType, boolean 
isDynamicGraph) {
 
         final Time rpcTimeout =
-                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+                
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
         final Time slotIdleTimeout =
                 
Time.milliseconds(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT));
         final Time batchSlotTimeout =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
index ce30f698034..47c23ee178b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.util.Preconditions;
 
@@ -75,7 +75,7 @@ public class JobMasterConfiguration {
     public static JobMasterConfiguration fromConfiguration(Configuration 
configuration) {
 
         final Time rpcTimeout =
-                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+                
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
 
         final Time slotRequestTimeout =
                 
Time.milliseconds(configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 9905906f640..61166015d29 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.minicluster;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.plugin.PluginManager;
@@ -90,8 +90,8 @@ public class MiniClusterConfiguration {
         }
 
         // increase the ask.timeout if not set in order to harden tests on 
slow CI
-        if (!modifiedConfig.contains(AkkaOptions.ASK_TIMEOUT_DURATION)) {
-            modifiedConfig.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMinutes(5L));
+        if (!modifiedConfig.contains(RpcOptions.ASK_TIMEOUT_DURATION)) {
+            modifiedConfig.set(RpcOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMinutes(5L));
         }
 
         return new UnmodifiableConfiguration(modifiedConfig);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
index fc473564b5e..3f28af9f31d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -20,10 +20,10 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.blocklist.BlocklistUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -89,7 +89,7 @@ public final class StandaloneResourceManagerFactory extends 
ResourceManagerFacto
                 fatalErrorHandler,
                 resourceManagerMetricGroup,
                 standaloneClusterStartupPeriodTime,
-                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)),
+                
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)),
                 ioExecutor);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
index 40df2011358..1698a6c4e8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.resourcemanager.active;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.blocklist.BlocklistHandler;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
@@ -165,7 +165,7 @@ public class ActiveResourceManager<WorkerType extends 
ResourceIDRetrievable>
                 resourceManagerMetricGroup,
                 Time.fromDuration(
                         Preconditions.checkNotNull(flinkConfig)
-                                .get(AkkaOptions.ASK_TIMEOUT_DURATION)),
+                                .get(RpcOptions.ASK_TIMEOUT_DURATION)),
                 ioExecutor);
 
         this.flinkConfig = flinkConfig;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index b5d9734b128..66181662277 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -20,10 +20,10 @@ package 
org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.resources.CPUResource;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.util.ConfigurationException;
@@ -238,7 +238,7 @@ public class SlotManagerConfiguration {
             throws ConfigurationException {
 
         final Time rpcTimeout =
-                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+                
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
 
         final Time taskManagerTimeout =
                 
Time.milliseconds(configuration.get(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 3d437b04e1f..34959bcc3f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -193,7 +193,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
 
         final String[] tmpDirPaths = 
ConfigurationUtils.parseTempDirectories(configuration);
 
-        final Duration rpcTimeout = 
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
+        final Duration rpcTimeout = 
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION);
 
         LOG.debug("Messages have a max timeout of " + rpcTimeout);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 726a414850e..26d3a290c78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JMXServerOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.core.fs.FileSystem;
@@ -171,7 +171,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
         this.pluginManager = checkNotNull(pluginManager);
         this.taskExecutorServiceFactory = 
checkNotNull(taskExecutorServiceFactory);
 
-        timeout = 
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+        timeout = 
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
 
         this.terminationFuture = new CompletableFuture<>();
         this.shutdown = false;
@@ -725,7 +725,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
             RpcSystemUtils rpcSystemUtils)
             throws LeaderRetrievalException {
 
-        final Duration lookupTimeout = 
configuration.get(AkkaOptions.LOOKUP_TIMEOUT_DURATION);
+        final Duration lookupTimeout = 
configuration.get(RpcOptions.LOOKUP_TIMEOUT_DURATION);
 
         final InetAddress taskManagerAddress =
                 LeaderRetrievalUtils.findConnectingAddress(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 48b0e08dd7a..fbd5a456747 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -289,7 +289,7 @@ public class TaskManagerServicesConfiguration {
                 QueryableStateConfiguration.fromConfiguration(configuration);
 
         long timerServiceShutdownTimeout =
-                configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis();
+                configuration.get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis();
 
         final RetryingRegistrationConfiguration 
retryingRegistrationConfiguration =
                 
RetryingRegistrationConfiguration.fromConfiguration(configuration);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index af658994f3c..e1d6c16778c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -267,7 +267,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
     }
 
     private VertexThreadInfoTracker 
initializeThreadInfoTracker(ScheduledExecutorService executor) {
-        final Duration askTimeout = 
clusterConfiguration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
+        final Duration askTimeout = 
clusterConfiguration.get(RpcOptions.ASK_TIMEOUT_DURATION);
 
         final Duration flameGraphCleanUpInterval =
                 
clusterConfiguration.get(RestOptions.FLAMEGRAPH_CLEANUP_INTERVAL);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index 4d2156701d6..58e38d62d45 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -60,7 +60,7 @@ public class TestingDefaultExecutionGraphBuilder {
         return new TestingDefaultExecutionGraphBuilder();
     }
 
-    private Time rpcTimeout = 
Time.fromDuration(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
+    private Time rpcTimeout = 
Time.fromDuration(RpcOptions.ASK_TIMEOUT_DURATION.defaultValue());
     private ClassLoader userClassLoader = 
DefaultExecutionGraph.class.getClassLoader();
     private BlobWriter blobWriter = VoidBlobWriter.getInstance();
     private ShuffleMaster<?> shuffleMaster = 
ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 7514e04e4cb..f7158a39da1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -62,7 +62,7 @@ class PartialConsumePipelinedResultTest {
 
     private static Configuration getFlinkConfiguration() {
         final Configuration config = new Configuration();
-        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_ASK_TIMEOUT);
+        config.set(RpcOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_ASK_TIMEOUT);
         config.set(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 
NUMBER_OF_NETWORK_BUFFERS);
 
         return config;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index e5e4c896b56..53822036e33 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
@@ -68,7 +68,7 @@ public class SlotCountExceedingParallelismTest extends 
TestLogger {
 
     private static Configuration getFlinkConfiguration() {
         final Configuration config = new Configuration();
-        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_ASK_TIMEOUT);
+        config.set(RpcOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_ASK_TIMEOUT);
 
         return config;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
index 67bc11a4266..8cdcb3e610e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.blocklist.BlocklistHandler;
 import org.apache.flink.runtime.blocklist.BlocklistUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -101,7 +101,7 @@ public class TestingResourceManagerFactory extends 
ResourceManagerFactory<Resour
                 clusterInformation,
                 fatalErrorHandler,
                 resourceManagerMetricGroup,
-                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)),
+                
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)),
                 ioExecutor);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index c9863579819..4a6a760636c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.rpc;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 
@@ -41,7 +41,7 @@ class RpcConnectionTest {
         // we start the RPC service with a very long timeout to ensure that 
the test
         // can only pass if the connection problem is not recognized merely 
via a timeout
         Configuration configuration = new Configuration();
-        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(10000000));
+        configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(10000000));
 
         try (RpcSystem rpcSystem = RpcSystem.load()) {
             final RpcService rpcService =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
index c6099b169fb..7828cc366e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.rpc;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -50,10 +50,10 @@ class RpcSSLAuthITCase {
     @Test
     void testConnectFailure() throws Exception {
         final Configuration baseConfig = new Configuration();
-        baseConfig.set(AkkaOptions.TCP_TIMEOUT, "1 s");
+        baseConfig.set(RpcOptions.TCP_TIMEOUT, "1 s");
         // we start the RPC service with a very long timeout to ensure that 
the test
         // can only pass if the connection problem is not recognized merely 
via a timeout
-        baseConfig.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(10000000));
+        baseConfig.set(RpcOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(10000000));
 
         // !!! This config has KEY_STORE_FILE / TRUST_STORE_FILE !!!
         Configuration sslConfig1 = new Configuration(baseConfig);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
index f30ac260f4f..8d7847a822f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
@@ -259,7 +259,7 @@ class TaskManagerRunnerConfigurationTest {
         final Configuration config = new Configuration();
         config.set(TaskManagerOptions.HOST_BIND_POLICY, bindPolicy.toString());
         config.set(JobManagerOptions.ADDRESS, "localhost");
-        config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMillis(10));
+        config.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMillis(10));
         return new UnmodifiableConfiguration(config);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
index 7aec05cbeb3..22ec9dbe70e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -91,7 +91,7 @@ public class MiniClusterResourceConfiguration {
         private int numberTaskManagers = 1;
         private int numberSlotsPerTaskManager = 1;
         private Time shutdownTimeout =
-                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+                
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
 
         private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
         private MiniCluster.HaServices haServices = 
MiniCluster.HaServices.CONFIGURED;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index d1f9f0324fe..86662095af4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.testutils;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
@@ -121,7 +121,7 @@ public class ZooKeeperTestUtils {
         config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
fsStateHandlePath + "/checkpoints");
         config.set(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath 
+ "/recovery");
 
-        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
+        config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
 
         return config;
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index acdd07735b5..179a3cb13f2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -51,7 +51,7 @@ import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
@@ -1443,7 +1443,7 @@ public class DataStream<T> {
                 new CollectSinkOperatorFactory<>(serializer, accumulatorName);
         CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
factory.getOperator();
         long resultFetchTimeout =
-                
env.getConfiguration().get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis();
+                
env.getConfiguration().get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis();
         CollectResultIterator<T> iterator =
                 new CollectResultIterator<>(
                         operator.getOperatorIdFuture(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
index f1562be0135..a6dd08b8f39 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.operators.collect;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.CheckpointingMode;
@@ -88,7 +88,7 @@ public class CollectResultIterator<T> implements 
CloseableIterator<T> {
                         operatorIdFuture,
                         accumulatorName,
                         retryMillis,
-                        
AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
+                        
RpcOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
         this.bufferedResult = null;
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
index 0a45f019dd9..4ce00d58bff 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.planner.connectors;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -130,7 +130,7 @@ public final class CollectDynamicSink implements 
DynamicTableSink {
                         inputStream
                                 .getExecutionEnvironment()
                                 .getConfiguration()
-                                .get(AkkaOptions.ASK_TIMEOUT_DURATION)
+                                .get(RpcOptions.ASK_TIMEOUT_DURATION)
                                 .toMillis();
 
                 iterator =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
index f38181badfc..d51337f5002 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
@@ -20,10 +20,10 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.batch;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
@@ -61,7 +61,7 @@ public class BatchExecDynamicFilteringDataCollector extends 
ExecNodeBase<Object>
                             "If the collector collects more data than the 
threshold (default is 8M), "
                                     + "an empty DynamicFilterEvent with a flag 
only will be sent to Coordinator, "
                                     + "which could avoid exceeding the pekko 
limit and out-of-memory (see "
-                                    + AkkaOptions.FRAMESIZE.key()
+                                    + RpcOptions.FRAMESIZE.key()
                                     + "). Otherwise a DynamicFilterEvent with 
all deduplicated records will be sent to Coordinator.");
 
     private final List<Integer> dynamicFilteringFieldIndices;
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
index 793e5b0bf9c..2fe57682f2c 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
@@ -26,8 +26,8 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.connector.testframe.environment.TestEnvironment;
 import 
org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
 import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
@@ -614,7 +614,7 @@ public abstract class SinkTestSuiteBase<T extends 
Comparable<T>> {
                 serializer,
                 accumulatorName,
                 stream.getExecutionEnvironment().getCheckpointConfig(),
-                AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
+                RpcOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
     }
 
     private void waitExpectedSizeData(CollectResultIterator<T> iterator, int 
targetNum) {
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
index 3efed9d4c16..7f5f0d3013a 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
@@ -26,8 +26,8 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.connector.testframe.environment.ClusterControllable;
 import org.apache.flink.connector.testframe.environment.TestEnvironment;
 import 
org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
@@ -795,7 +795,7 @@ public abstract class SourceTestSuiteBase<T> {
                             serializer,
                             accumulatorName,
                             checkpointConfig,
-                            
AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
+                            
RpcOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
             iterator.setJobClient(jobClient);
             return iterator;
         }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index a9087f9b249..96219147c30 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -27,9 +27,9 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.optimizer.DataStatistics;
@@ -102,7 +102,7 @@ public class AccumulatorLiveITCase extends TestLogger {
 
     private static Configuration getConfiguration() {
         Configuration config = new Configuration();
-        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_ASK_TIMEOUT);
+        config.set(RpcOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_ASK_TIMEOUT);
         config.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 
HEARTBEAT_INTERVAL);
 
         return config;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index cd9206c781e..5bcd17b9826 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -85,7 +85,7 @@ public abstract class CancelingTestBase extends TestLogger {
         verifyJvmOptions();
         Configuration config = new Configuration();
         config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
-        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_ASK_TIMEOUT);
+        config.set(RpcOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_ASK_TIMEOUT);
         config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
MemorySize.parse("4096"));
         config.set(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 2048);
 
@@ -99,7 +99,7 @@ public abstract class CancelingTestBase extends TestLogger {
         // submit job
         final JobGraph jobGraph = getJobGraph(plan);
 
-        final long rpcTimeout = 
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis();
+        final long rpcTimeout = 
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis();
 
         ClusterClient<?> client = CLUSTER.getClusterClient();
         JobID jobID = client.submitJob(jobGraph).get();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index dae5e0aef99..0fe64cf0377 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -23,9 +23,9 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -72,8 +72,8 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
     private static Configuration getConfiguration() {
         Configuration config = new Configuration();
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("48m"));
-        config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMinutes(1));
-        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
+        config.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMinutes(1));
+        config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
         return config;
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index d82b77b63cd..6405546d3d4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -27,11 +27,11 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.contrib.streaming.state.RocksDBOptions;
@@ -236,7 +236,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
         final File haDir = temporaryFolder.newFolder();
 
         Configuration config = new Configuration();
-        config.set(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + 
"b");
+        config.set(RpcOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + 
"b");
 
         if (zkServer != null) {
             config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 4218be61e60..80084bc3fbc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -42,11 +42,11 @@ import 
org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.InputStatus;
@@ -785,7 +785,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             // amount of buffers
             conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, 
MemorySize.ofMebiBytes(32));
             conf.set(TaskManagerOptions.NETWORK_MEMORY_MAX, 
MemorySize.ofMebiBytes(32));
-            conf.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
+            conf.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
             return conf;
         }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 1c484d3213a..c1a8fe9e95a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -24,10 +24,10 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
@@ -136,7 +136,7 @@ public class ClassLoaderITCase extends TestLogger {
 
         // some tests check for serialization problems related to class-loading
         // this requires all RPCs to actually go through serialization
-        config.set(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION, true);
+        config.set(RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION, true);
 
         miniClusterResource =
                 new MiniClusterResource(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index 993f9eaa0ce..a911f1a0fd4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -46,7 +46,7 @@ public class StreamingCustomInputSplitProgram {
     public static void main(String[] args) throws Exception {
         Configuration config = new Configuration();
 
-        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(5));
+        config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(5));
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index 5e97398f775..17b73c9c847 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -22,8 +22,8 @@ import 
org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -60,7 +60,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
     @Test
     public void testUserSpecificParallelism() throws Exception {
         Configuration config = new Configuration();
-        config.set(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
+        config.set(RpcOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
 
         final URI restAddress = MINI_CLUSTER_RESOURCE.getRestAddress();
         final String hostname = restAddress.getHost();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 165e8326f16..246399b1a26 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -23,12 +23,12 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.core.testutils.EachCallbackWrapper;
@@ -110,7 +110,7 @@ class ProcessFailureCancelingITCase {
 
         Configuration config = new Configuration();
         config.set(JobManagerOptions.ADDRESS, "localhost");
-        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
+        config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
         config.set(HighAvailabilityOptions.HA_MODE, "zookeeper");
         config.set(
                 HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java
index e2c575387d5..a188c7b4379 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java
@@ -19,13 +19,13 @@
 package org.apache.flink.test.recovery;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blocklist.BlocklistUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -207,7 +207,7 @@ public class TaskManagerDisconnectOnShutdownITCase {
                     fatalErrorHandler,
                     resourceManagerMetricGroup,
                     standaloneClusterStartupPeriodTime,
-                    
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)),
+                    
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)),
                     ioExecutor) {
 
                 @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java
index b59f1614baa..0fe8f4d5f7c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.test.recovery;
 
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
@@ -62,7 +62,7 @@ public class TaskManagerRunnerITCase extends TestLogger {
                 ClusterOptions.PROCESS_WORKING_DIR_BASE, 
workingDirBase.getAbsolutePath());
         configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, 
resourceId.toString());
         configuration.set(JobManagerOptions.ADDRESS, "localhost");
-        configuration.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO);
+        configuration.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO);
 
         final File workingDirectory =
                 ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile(
@@ -99,7 +99,7 @@ public class TaskManagerRunnerITCase extends TestLogger {
         configuration.set(
                 ClusterOptions.PROCESS_WORKING_DIR_BASE, 
workingDirBase.getAbsolutePath());
         configuration.set(JobManagerOptions.ADDRESS, "localhost");
-        configuration.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO);
+        configuration.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO);
 
         final TestProcessBuilder.TestProcess taskManagerProcess =
                 new 
TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName())
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index 9a092c8147a..99965735b4c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -20,9 +20,9 @@ package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -85,7 +85,7 @@ public class ShuffleCompressionITCase {
     public void testNoDataCompressionForBoundedBlockingShuffle() throws 
Exception {
         Configuration configuration = new Configuration();
         
configuration.set(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED,
 false);
-        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMinutes(1));
+        configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMinutes(1));
         configuration.set(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
@@ -98,7 +98,7 @@ public class ShuffleCompressionITCase {
     public void testNoDataCompressionForSortMergeBlockingShuffle() throws 
Exception {
         Configuration configuration = new Configuration();
         
configuration.set(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED,
 false);
-        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMinutes(1));
+        configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMinutes(1));
 
         JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, 
ExecutionMode.BATCH);
         JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, 
NUM_SLOTS);
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
index de4cb0c5b03..05f5d25638f 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
@@ -21,12 +21,12 @@ package org.apache.flink.yarn;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
@@ -131,7 +131,7 @@ class YARNApplicationITCase extends YarnTestBase {
         Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(768));
         configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1g"));
-        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(30));
+        configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(30));
         configuration.set(DeploymentOptions.TARGET, 
YarnDeploymentTarget.APPLICATION.getName());
         configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion);
         configuration.set(PipelineOptions.JARS, 
Collections.singletonList(userJar.toString()));
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
index 61bb1f50421..84975185d3b 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
@@ -20,10 +20,10 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
@@ -152,7 +152,7 @@ class YARNFileReplicationITCase extends YarnTestBase {
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(768));
         configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1g"));
-        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(30));
+        configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(30));
         configuration.set(CLASSPATH_INCLUDE_USER_JAR, 
YarnConfigOptions.UserJarInclusion.DISABLED);
 
         return configuration;
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 551f355adea..4a1963ced5e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -21,10 +21,10 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobType;
@@ -214,7 +214,7 @@ class YARNITCase extends YarnTestBase {
         Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(768));
         configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1g"));
-        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(30));
+        configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(30));
         configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion);
 
         return configuration;
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index ab8986adccf..48efd848e07 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -19,8 +19,8 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptionsInternal;
@@ -118,7 +118,7 @@ public class YarnTaskExecutorRunner {
         LOG.info("TM: keytab principal obtained {}", keytabPrincipal);
 
         // tell pekko to die in case of an error
-        configuration.set(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+        configuration.set(RpcOptions.JVM_EXIT_ON_FATAL_ERROR, true);
 
         String keytabPath = Utils.resolveKeytabPath(currDir, localKeytabPath);
 
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index b183fd2604e..f596e476a6f 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -27,13 +27,13 @@ import 
org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.util.ConfigurationException;
@@ -100,7 +100,7 @@ class FlinkYarnSessionCliTest {
                             "-j",
                             "fake.jar",
                             "-D",
-                            AkkaOptions.ASK_TIMEOUT_DURATION.key() + "=5 min",
+                            RpcOptions.ASK_TIMEOUT_DURATION.key() + "=5 min",
                             "-D",
                             CoreOptions.FLINK_JVM_OPTIONS.key() + 
"=-DappName=foobar",
                             "-D",
@@ -108,7 +108,7 @@ class FlinkYarnSessionCliTest {
                         });
 
         Configuration executorConfig = cli.toConfiguration(cmd);
-        
assertThat(executorConfig.get(AkkaOptions.ASK_TIMEOUT_DURATION)).hasMinutes(5);
+        
assertThat(executorConfig.get(RpcOptions.ASK_TIMEOUT_DURATION)).hasMinutes(5);
         
assertThat(executorConfig.get(CoreOptions.FLINK_JVM_OPTIONS)).isEqualTo("-DappName=foobar");
         
assertThat(executorConfig.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD))
                 .isEqualTo("changeit");

Reply via email to