This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c678244a389 [FLINK-32684][rpc] Introduces RpcOptions and deprecates
AkkaOptions (#24188)
c678244a389 is described below
commit c678244a3890273145a786b9e1bf1a4f96f6dcfd
Author: Matthias Pohl <[email protected]>
AuthorDate: Tue Jan 30 09:57:37 2024 +0100
[FLINK-32684][rpc] Introduces RpcOptions and deprecates AkkaOptions (#24188)
* Renames AkkaOptions into RpcOptions using the IDE and updates the class'
JavaDoc
* Copies RpcOptions to AkkaOptions and deprecates AkkaOptions.
Additionally, makes each option reference the corresponding RpcOption member.
* Moves deprecated RPC options out of RpcOptions and updates references
---
.../apache/flink/client/program/ClientTest.java | 5 +-
.../apache/flink/configuration/AkkaOptions.java | 265 ++++-----------------
.../flink/configuration/ConfigConstants.java | 42 ++--
.../{AkkaOptions.java => RpcOptions.java} | 108 +--------
.../flink/configuration/TaskManagerOptions.java | 6 +-
.../rpc/pekko/ActorSystemBootstrapTools.java | 16 +-
.../runtime/rpc/pekko/PekkoInvocationHandler.java | 4 +-
.../rpc/pekko/PekkoRpcServiceConfiguration.java | 8 +-
.../runtime/rpc/pekko/PekkoRpcServiceUtils.java | 6 +-
.../apache/flink/runtime/rpc/pekko/PekkoUtils.java | 34 +--
.../rpc/pekko/MessageSerializationTest.java | 4 +-
.../PekkoRpcActorOversizedResponseMessageTest.java | 6 +-
.../flink/runtime/rpc/pekko/PekkoUtilsTest.java | 4 +-
.../DefaultSlotPoolServiceSchedulerFactory.java | 4 +-
.../runtime/jobmaster/JobMasterConfiguration.java | 4 +-
.../minicluster/MiniClusterConfiguration.java | 6 +-
.../StandaloneResourceManagerFactory.java | 4 +-
.../active/ActiveResourceManager.java | 4 +-
.../slotmanager/SlotManagerConfiguration.java | 4 +-
.../taskexecutor/TaskManagerConfiguration.java | 4 +-
.../runtime/taskexecutor/TaskManagerRunner.java | 6 +-
.../TaskManagerServicesConfiguration.java | 4 +-
.../runtime/webmonitor/WebMonitorEndpoint.java | 4 +-
.../TestingDefaultExecutionGraphBuilder.java | 4 +-
.../PartialConsumePipelinedResultTest.java | 4 +-
.../SlotCountExceedingParallelismTest.java | 4 +-
.../TestingResourceManagerFactory.java | 4 +-
.../flink/runtime/rpc/RpcConnectionTest.java | 4 +-
.../apache/flink/runtime/rpc/RpcSSLAuthITCase.java | 6 +-
.../TaskManagerRunnerConfigurationTest.java | 4 +-
.../MiniClusterResourceConfiguration.java | 4 +-
.../runtime/testutils/ZooKeeperTestUtils.java | 4 +-
.../flink/streaming/api/datastream/DataStream.java | 4 +-
.../operators/collect/CollectResultIterator.java | 4 +-
.../planner/connectors/CollectDynamicSink.java | 4 +-
.../BatchExecDynamicFilteringDataCollector.java | 4 +-
.../testframe/testsuites/SinkTestSuiteBase.java | 4 +-
.../testframe/testsuites/SourceTestSuiteBase.java | 4 +-
.../test/accumulators/AccumulatorLiveITCase.java | 4 +-
.../flink/test/cancelling/CancelingTestBase.java | 6 +-
.../EventTimeAllWindowCheckpointingITCase.java | 6 +-
.../EventTimeWindowCheckpointingITCase.java | 4 +-
.../checkpointing/UnalignedCheckpointTestBase.java | 4 +-
.../flink/test/classloading/ClassLoaderITCase.java | 4 +-
.../jar/StreamingCustomInputSplitProgram.java | 4 +-
.../test/operators/RemoteEnvironmentITCase.java | 4 +-
.../recovery/ProcessFailureCancelingITCase.java | 4 +-
.../TaskManagerDisconnectOnShutdownITCase.java | 4 +-
.../test/recovery/TaskManagerRunnerITCase.java | 6 +-
.../test/runtime/ShuffleCompressionITCase.java | 6 +-
.../apache/flink/yarn/YARNApplicationITCase.java | 4 +-
.../flink/yarn/YARNFileReplicationITCase.java | 4 +-
.../java/org/apache/flink/yarn/YARNITCase.java | 4 +-
.../apache/flink/yarn/YarnTaskExecutorRunner.java | 4 +-
.../apache/flink/yarn/FlinkYarnSessionCliTest.java | 6 +-
55 files changed, 200 insertions(+), 492 deletions(-)
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 3e591162ba9..7b53a79a147 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -33,13 +33,13 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.testjar.ForbidConfigurationJob;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
@@ -101,8 +101,7 @@ class ClientTest {
config = new Configuration();
config.set(JobManagerOptions.ADDRESS, "localhost");
- config.set(
- AkkaOptions.ASK_TIMEOUT_DURATION,
AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
+ config.set(RpcOptions.ASK_TIMEOUT_DURATION,
RpcOptions.ASK_TIMEOUT_DURATION.defaultValue());
}
private Configuration fromPackagedProgram(
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 5f3cccc495b..236ec3e3511 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -18,154 +18,78 @@
package org.apache.flink.configuration;
-import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.util.TimeUtils;
import java.time.Duration;
import static org.apache.flink.configuration.description.LinkElement.link;
-import static org.apache.flink.configuration.description.TextElement.code;
-/** Akka configuration options. */
+/**
+ * RPC configuration options.
+ *
+ * @deprecated Use {@link RpcOptions} instead.
+ */
@PublicEvolving
+@Deprecated // since 1.19.0
public class AkkaOptions {
- @Internal
- @Documentation.ExcludeFromDocumentation("Internal use only")
- public static final ConfigOption<Boolean>
FORCE_RPC_INVOCATION_SERIALIZATION =
- ConfigOptions.key("pekko.rpc.force-invocation-serialization")
- .booleanType()
- .defaultValue(false)
-
.withDeprecatedKeys("akka.rpc.force-invocation-serialization")
- .withDescription(
- Description.builder()
- .text(
- "Forces the serialization of all
RPC invocations (that are not explicitly annotated with %s)."
- + "This option can be used
to find serialization issues in the argument/response types without relying
requiring HA setups."
- + "This option should not
be enabled in production.",
-
code("org.apache.flink.runtime.rpc.Local"))
- .build());
-
public static boolean
isForceRpcInvocationSerializationEnabled(Configuration config) {
- return config.getOptional(FORCE_RPC_INVOCATION_SERIALIZATION)
- .orElse(
- FORCE_RPC_INVOCATION_SERIALIZATION.defaultValue()
- || System.getProperties()
-
.containsKey(FORCE_RPC_INVOCATION_SERIALIZATION.key()));
+ return RpcOptions.isForceRpcInvocationSerializationEnabled(config);
}
/** Flag whether to capture call stacks for RPC ask calls. */
public static final ConfigOption<Boolean> CAPTURE_ASK_CALLSTACK =
- ConfigOptions.key("pekko.ask.callstack")
- .booleanType()
- .defaultValue(true)
- .withDeprecatedKeys("akka.ask.callstack")
- .withDescription(
- "If true, call stack for asynchronous asks are
captured. That way, when an ask fails "
- + "(for example times out), you get a
proper exception, describing to the original method call and "
- + "call site. Note that in case of having
millions of concurrent RPC calls, this may add to the "
- + "memory footprint.");
+ RpcOptions.CAPTURE_ASK_CALLSTACK;
/** Timeout for Pekko ask calls. */
public static final ConfigOption<Duration> ASK_TIMEOUT_DURATION =
- ConfigOptions.key("pekko.ask.timeout")
- .durationType()
- .defaultValue(Duration.ofSeconds(10))
- .withDeprecatedKeys("akka.ask.timeout")
- .withDescription(
- "Timeout used for all futures and blocking Pekko
calls. If Flink fails due to timeouts then you"
- + " should try to increase this value.
Timeouts can be caused by slow machines or a congested network. The"
- + " timeout value requires a time-unit
specifier (ms/s/min/h/d).");
+ RpcOptions.ASK_TIMEOUT_DURATION;
- /** @deprecated Use {@link #ASK_TIMEOUT_DURATION} */
+ /** @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} */
@Deprecated
public static final ConfigOption<String> ASK_TIMEOUT =
- ConfigOptions.key(ASK_TIMEOUT_DURATION.key())
+ ConfigOptions.key(RpcOptions.ASK_TIMEOUT_DURATION.key())
.stringType()
.defaultValue(
-
TimeUtils.formatWithHighestUnit(ASK_TIMEOUT_DURATION.defaultValue()))
- .withDescription(ASK_TIMEOUT_DURATION.description());
+ TimeUtils.formatWithHighestUnit(
+
RpcOptions.ASK_TIMEOUT_DURATION.defaultValue()))
+
.withDescription(RpcOptions.ASK_TIMEOUT_DURATION.description());
/** The Pekko tcp connection timeout. */
- public static final ConfigOption<String> TCP_TIMEOUT =
- ConfigOptions.key("pekko.tcp.timeout")
- .stringType()
- .defaultValue("20 s")
- .withDeprecatedKeys("akka.tcp.timeout")
- .withDescription(
- "Timeout for all outbound connections. If you
should experience problems with connecting to a"
- + " TaskManager due to a slow network, you
should increase this value.");
+ public static final ConfigOption<String> TCP_TIMEOUT =
RpcOptions.TCP_TIMEOUT;
/** Timeout for the startup of the actor system. */
- public static final ConfigOption<String> STARTUP_TIMEOUT =
- ConfigOptions.key("pekko.startup-timeout")
- .stringType()
- .noDefaultValue()
- .withDeprecatedKeys("akka.startup-timeout")
- .withDescription(
- "Timeout after which the startup of a remote
component is considered being failed.");
+ public static final ConfigOption<String> STARTUP_TIMEOUT =
RpcOptions.STARTUP_TIMEOUT;
/** Override SSL support for the Pekko transport. */
- public static final ConfigOption<Boolean> SSL_ENABLED =
- ConfigOptions.key("pekko.ssl.enabled")
- .booleanType()
- .defaultValue(true)
- .withDeprecatedKeys("akka.ssl.enabled")
- .withDescription(
- "Turns on SSL for Pekko’s remote communication.
This is applicable only when the global ssl flag"
- + " security.ssl.enabled is set to true.");
+ public static final ConfigOption<Boolean> SSL_ENABLED =
RpcOptions.SSL_ENABLED;
/** Maximum framesize of Pekko messages. */
- public static final ConfigOption<String> FRAMESIZE =
- ConfigOptions.key("pekko.framesize")
- .stringType()
- .defaultValue("10485760b")
- .withDeprecatedKeys("akka.framesize")
- .withDescription(
- "Maximum size of messages which are sent between
the JobManager and the TaskManagers. If Flink"
- + " fails because messages exceed this
limit, then you should increase it. The message size requires a"
- + " size-unit specifier.");
+ public static final ConfigOption<String> FRAMESIZE = RpcOptions.FRAMESIZE;
/** Maximum number of messages until another actor is executed by the same
thread. */
public static final ConfigOption<Integer> DISPATCHER_THROUGHPUT =
- ConfigOptions.key("pekko.throughput")
- .intType()
- .defaultValue(15)
- .withDeprecatedKeys("akka.throughput")
- .withDescription(
- "Number of messages that are processed in a batch
before returning the thread to the pool. Low"
- + " values denote a fair scheduling
whereas high values can increase the performance at the cost of unfairness.");
+ RpcOptions.DISPATCHER_THROUGHPUT;
/** Log lifecycle events. */
public static final ConfigOption<Boolean> LOG_LIFECYCLE_EVENTS =
- ConfigOptions.key("pekko.log.lifecycle.events")
- .booleanType()
- .defaultValue(false)
- .withDeprecatedKeys("akka.log.lifecycle.events")
- .withDescription(
- "Turns on the Pekko’s remote logging of events.
Set this value to 'true' in case of debugging.");
+ RpcOptions.LOG_LIFECYCLE_EVENTS;
/** Timeout for all blocking calls that look up remote actors. */
public static final ConfigOption<Duration> LOOKUP_TIMEOUT_DURATION =
- ConfigOptions.key("pekko.lookup.timeout")
- .durationType()
- .defaultValue(Duration.ofSeconds(10))
- .withDeprecatedKeys("akka.lookup.timeout")
- .withDescription(
- "Timeout used for the lookup of the JobManager.
The timeout value has to contain a time-unit"
- + " specifier (ms/s/min/h/d).");
+ RpcOptions.LOOKUP_TIMEOUT_DURATION;
- /** @deprecated use {@link #LOOKUP_TIMEOUT_DURATION} */
+ /** @deprecated use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} */
@Deprecated
public static final ConfigOption<String> LOOKUP_TIMEOUT =
- ConfigOptions.key(LOOKUP_TIMEOUT_DURATION.key())
+ ConfigOptions.key(RpcOptions.LOOKUP_TIMEOUT_DURATION.key())
.stringType()
.defaultValue(
-
TimeUtils.formatWithHighestUnit(LOOKUP_TIMEOUT_DURATION.defaultValue()))
- .withDescription(LOOKUP_TIMEOUT_DURATION.description());
+ TimeUtils.formatWithHighestUnit(
+
RpcOptions.LOOKUP_TIMEOUT_DURATION.defaultValue()))
+
.withDescription(RpcOptions.LOOKUP_TIMEOUT_DURATION.description());
/**
* Timeout for all blocking calls on the client side.
@@ -183,167 +107,58 @@ public class AkkaOptions {
/** Exit JVM on fatal Pekko errors. */
public static final ConfigOption<Boolean> JVM_EXIT_ON_FATAL_ERROR =
- ConfigOptions.key("pekko.jvm-exit-on-fatal-error")
- .booleanType()
- .defaultValue(true)
- .withDeprecatedKeys("akka.jvm-exit-on-fatal-error")
- .withDescription("Exit JVM on fatal Pekko errors.");
+ RpcOptions.JVM_EXIT_ON_FATAL_ERROR;
/** Milliseconds a gate should be closed for after a remote connection was
disconnected. */
- public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR =
- ConfigOptions.key("pekko.retry-gate-closed-for")
- .longType()
- .defaultValue(50L)
- .withDeprecatedKeys("akka.retry-gate-closed-for")
- .withDescription(
- "Milliseconds a gate should be closed for after a
remote connection was disconnected.");
+ public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR =
RpcOptions.RETRY_GATE_CLOSED_FOR;
// ==================================================
// Configurations for fork-join-executor.
// ==================================================
public static final ConfigOption<Double>
FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR =
- ConfigOptions.key("pekko.fork-join-executor.parallelism-factor")
- .doubleType()
- .defaultValue(2.0)
-
.withDeprecatedKeys("akka.fork-join-executor.parallelism-factor")
- .withDescription(
- Description.builder()
- .text(
- "The parallelism factor is used to
determine thread pool size using the"
- + " following formula:
ceil(available processors * factor). Resulting size"
- + " is then bounded by the
parallelism-min and parallelism-max values.")
- .build());
+ RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR;
public static final ConfigOption<Integer>
FORK_JOIN_EXECUTOR_PARALLELISM_MIN =
- ConfigOptions.key("pekko.fork-join-executor.parallelism-min")
- .intType()
- .defaultValue(8)
-
.withDeprecatedKeys("akka.fork-join-executor.parallelism-min")
- .withDescription(
- Description.builder()
- .text(
- "Min number of threads to cap
factor-based parallelism number to.")
- .build());
+ RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN;
public static final ConfigOption<Integer>
FORK_JOIN_EXECUTOR_PARALLELISM_MAX =
- ConfigOptions.key("pekko.fork-join-executor.parallelism-max")
- .intType()
- .defaultValue(64)
-
.withDeprecatedKeys("akka.fork-join-executor.parallelism-max")
- .withDescription(
- Description.builder()
- .text(
- "Max number of threads to cap
factor-based parallelism number to.")
- .build());
+ RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX;
public static final ConfigOption<Double>
REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR =
-
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-factor")
- .doubleType()
- .defaultValue(2.0)
- .withDescription(
- Description.builder()
- .text(
- "The parallelism factor is used to
determine thread pool size using the"
- + " following formula:
ceil(available processors * factor). Resulting size"
- + " is then bounded by the
parallelism-min and parallelism-max values.")
- .build());
+ RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR;
public static final ConfigOption<Integer>
REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN =
-
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-min")
- .intType()
- .defaultValue(8)
- .withDescription(
- Description.builder()
- .text(
- "Min number of threads to cap
factor-based parallelism number to.")
- .build());
+ RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN;
public static final ConfigOption<Integer>
REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX =
-
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-max")
- .intType()
- .defaultValue(16)
- .withDescription(
- Description.builder()
- .text(
- "Max number of threads to cap
factor-based parallelism number to.")
- .build());
+ RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX;
// ==================================================
// Configurations for client-socket-work-pool.
// ==================================================
public static final ConfigOption<Integer>
CLIENT_SOCKET_WORKER_POOL_SIZE_MIN =
- ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-min")
- .intType()
- .defaultValue(1)
-
.withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-min")
- .withDescription(
- Description.builder()
- .text("Min number of threads to cap
factor-based number to.")
- .build());
+ RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN;
public static final ConfigOption<Integer>
CLIENT_SOCKET_WORKER_POOL_SIZE_MAX =
- ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-max")
- .intType()
- .defaultValue(2)
-
.withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-max")
- .withDescription(
- Description.builder()
- .text("Max number of threads to cap
factor-based number to.")
- .build());
+ RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX;
public static final ConfigOption<Double>
CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR =
-
ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-factor")
- .doubleType()
- .defaultValue(1.0)
-
.withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-factor")
- .withDescription(
- Description.builder()
- .text(
- "The pool size factor is used to
determine thread pool size"
- + " using the following
formula: ceil(available processors * factor)."
- + " Resulting size is then
bounded by the pool-size-min and"
- + " pool-size-max values.")
- .build());
+ RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR;
// ==================================================
// Configurations for server-socket-work-pool.
// ==================================================
public static final ConfigOption<Integer>
SERVER_SOCKET_WORKER_POOL_SIZE_MIN =
- ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-min")
- .intType()
- .defaultValue(1)
-
.withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-min")
- .withDescription(
- Description.builder()
- .text("Min number of threads to cap
factor-based number to.")
- .build());
+ RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN;
public static final ConfigOption<Integer>
SERVER_SOCKET_WORKER_POOL_SIZE_MAX =
- ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-max")
- .intType()
- .defaultValue(2)
-
.withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-max")
- .withDescription(
- Description.builder()
- .text("Max number of threads to cap
factor-based number to.")
- .build());
+ RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX;
public static final ConfigOption<Double>
SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR =
-
ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-factor")
- .doubleType()
- .defaultValue(1.0)
-
.withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-factor")
- .withDescription(
- Description.builder()
- .text(
- "The pool size factor is used to
determine thread pool size"
- + " using the following
formula: ceil(available processors * factor)."
- + " Resulting size is then
bounded by the pool-size-min and"
- + " pool-size-max values.")
- .build());
+ RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR;
// ==================================================
// Deprecated options
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b7751abd07b..58a33a6060a 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -764,14 +764,14 @@ public final class ConfigConstants {
/**
* Timeout for the startup of the actor system.
*
- * @deprecated Use {@link AkkaOptions#STARTUP_TIMEOUT} instead.
+ * @deprecated Use {@link RpcOptions#STARTUP_TIMEOUT} instead.
*/
@Deprecated public static final String AKKA_STARTUP_TIMEOUT =
"akka.startup-timeout";
/**
* Heartbeat interval of the transport failure detector.
*
- * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL}
instead.
+ * @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead.
*/
@Deprecated
public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL =
@@ -780,7 +780,7 @@ public final class ConfigConstants {
/**
* Allowed heartbeat pause for the transport failure detector.
*
- * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
+ * @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
*/
@Deprecated
public static final String AKKA_TRANSPORT_HEARTBEAT_PAUSE =
"akka.transport.heartbeat.pause";
@@ -788,7 +788,7 @@ public final class ConfigConstants {
/**
* Detection threshold of transport failure detector.
*
- * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
+ * @deprecated Use {@link RpcOptions#TRANSPORT_THRESHOLD} instead.
*/
@Deprecated public static final String AKKA_TRANSPORT_THRESHOLD =
"akka.transport.threshold";
@@ -818,49 +818,49 @@ public final class ConfigConstants {
/**
* Akka TCP timeout.
*
- * @deprecated Use {@link AkkaOptions#TCP_TIMEOUT} instead.
+ * @deprecated Use {@link RpcOptions#TCP_TIMEOUT} instead.
*/
@Deprecated public static final String AKKA_TCP_TIMEOUT =
"akka.tcp.timeout";
/**
* Override SSL support for the Akka transport.
*
- * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
+ * @deprecated Use {@link RpcOptions#SSL_ENABLED} instead.
*/
@Deprecated public static final String AKKA_SSL_ENABLED =
"akka.ssl.enabled";
/**
* Maximum framesize of akka messages.
*
- * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
+ * @deprecated Use {@link RpcOptions#FRAMESIZE} instead.
*/
@Deprecated public static final String AKKA_FRAMESIZE = "akka.framesize";
/**
* Maximum number of messages until another actor is executed by the same
thread.
*
- * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
+ * @deprecated Use {@link RpcOptions#DISPATCHER_THROUGHPUT} instead.
*/
@Deprecated public static final String AKKA_DISPATCHER_THROUGHPUT =
"akka.throughput";
/**
* Log lifecycle events.
*
- * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
+ * @deprecated Use {@link RpcOptions#LOG_LIFECYCLE_EVENTS} instead.
*/
@Deprecated public static final String AKKA_LOG_LIFECYCLE_EVENTS =
"akka.log.lifecycle.events";
/**
* Timeout for all blocking calls on the cluster side.
*
- * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
+ * @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} instead.
*/
@Deprecated public static final String AKKA_ASK_TIMEOUT =
"akka.ask.timeout";
/**
* Timeout for all blocking calls that look up remote actors.
*
- * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
+ * @deprecated Use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} instead.
*/
@Deprecated public static final String AKKA_LOOKUP_TIMEOUT =
"akka.lookup.timeout";
@@ -874,7 +874,7 @@ public final class ConfigConstants {
/**
* Exit JVM on fatal Akka errors.
*
- * @deprecated Use {@link AkkaOptions#JVM_EXIT_ON_FATAL_ERROR} instead.
+ * @deprecated Use {@link RpcOptions#JVM_EXIT_ON_FATAL_ERROR} instead.
*/
@Deprecated
public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR =
"akka.jvm-exit-on-fatal-error";
@@ -1547,37 +1547,37 @@ public final class ConfigConstants {
// ------------------------------ Akka Values
------------------------------
- /** @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL}
instead. */
+ /** @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_INTERVAL}
instead. */
@Deprecated public static final String
DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
- /** @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
*/
+ /** @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
*/
@Deprecated public static final String
DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";
- /** @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead. */
+ /** @deprecated Use {@link RpcOptions#TRANSPORT_THRESHOLD} instead. */
@Deprecated public static final double DEFAULT_AKKA_TRANSPORT_THRESHOLD =
300.0;
/** @deprecated This default value is no longer used and has no effect on
Flink. */
@Deprecated public static final double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
- /** @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead. */
+ /** @deprecated Use {@link RpcOptions#DISPATCHER_THROUGHPUT} instead. */
@Deprecated public static final int DEFAULT_AKKA_DISPATCHER_THROUGHPUT =
15;
- /** @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead. */
+ /** @deprecated Use {@link RpcOptions#LOG_LIFECYCLE_EVENTS} instead. */
@Deprecated public static final boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS
= false;
- /** @deprecated Use {@link AkkaOptions#FRAMESIZE} instead. */
+ /** @deprecated Use {@link RpcOptions#FRAMESIZE} instead. */
@Deprecated public static final String DEFAULT_AKKA_FRAMESIZE =
"10485760b";
- /** @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead. */
+ /** @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} instead. */
@Deprecated public static final String DEFAULT_AKKA_ASK_TIMEOUT = "10 s";
- /** @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead. */
+ /** @deprecated Use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} instead. */
@Deprecated public static final String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10
s";
/** @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead. */
@Deprecated public static final String DEFAULT_AKKA_CLIENT_TIMEOUT = "60
s";
- /** @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead. */
+ /** @deprecated Use {@link RpcOptions#SSL_ENABLED} instead. */
@Deprecated public static final boolean DEFAULT_AKKA_SSL_ENABLED = true;
// ----------------------------- SSL Values
--------------------------------
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/RpcOptions.java
similarity index 75%
copy from
flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
copy to flink-core/src/main/java/org/apache/flink/configuration/RpcOptions.java
index 5f3cccc495b..a2196634788 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RpcOptions.java
@@ -22,16 +22,14 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
-import org.apache.flink.util.TimeUtils;
import java.time.Duration;
-import static org.apache.flink.configuration.description.LinkElement.link;
import static org.apache.flink.configuration.description.TextElement.code;
-/** Akka configuration options. */
+/** RPC configuration options. */
@PublicEvolving
-public class AkkaOptions {
+public class RpcOptions {
@Internal
@Documentation.ExcludeFromDocumentation("Internal use only")
@@ -80,15 +78,6 @@ public class AkkaOptions {
+ " should try to increase this value.
Timeouts can be caused by slow machines or a congested network. The"
+ " timeout value requires a time-unit
specifier (ms/s/min/h/d).");
- /** @deprecated Use {@link #ASK_TIMEOUT_DURATION} */
- @Deprecated
- public static final ConfigOption<String> ASK_TIMEOUT =
- ConfigOptions.key(ASK_TIMEOUT_DURATION.key())
- .stringType()
- .defaultValue(
-
TimeUtils.formatWithHighestUnit(ASK_TIMEOUT_DURATION.defaultValue()))
- .withDescription(ASK_TIMEOUT_DURATION.description());
-
/** The Pekko tcp connection timeout. */
public static final ConfigOption<String> TCP_TIMEOUT =
ConfigOptions.key("pekko.tcp.timeout")
@@ -158,29 +147,6 @@ public class AkkaOptions {
"Timeout used for the lookup of the JobManager.
The timeout value has to contain a time-unit"
+ " specifier (ms/s/min/h/d).");
- /** @deprecated use {@link #LOOKUP_TIMEOUT_DURATION} */
- @Deprecated
- public static final ConfigOption<String> LOOKUP_TIMEOUT =
- ConfigOptions.key(LOOKUP_TIMEOUT_DURATION.key())
- .stringType()
- .defaultValue(
-
TimeUtils.formatWithHighestUnit(LOOKUP_TIMEOUT_DURATION.defaultValue()))
- .withDescription(LOOKUP_TIMEOUT_DURATION.description());
-
- /**
- * Timeout for all blocking calls on the client side.
- *
- * @deprecated Use the {@code ClientOptions.CLIENT_TIMEOUT} instead.
- */
- @Deprecated
- public static final ConfigOption<String> CLIENT_TIMEOUT =
- ConfigOptions.key("akka.client.timeout")
- .stringType()
- .defaultValue("60 s")
- .withDescription(
- "DEPRECATED: Use the \"client.timeout\" instead."
- + " Timeout for all blocking calls on the
client side.");
-
/** Exit JVM on fatal Pekko errors. */
public static final ConfigOption<Boolean> JVM_EXIT_ON_FATAL_ERROR =
ConfigOptions.key("pekko.jvm-exit-on-fatal-error")
@@ -344,74 +310,4 @@ public class AkkaOptions {
+ " Resulting size is then
bounded by the pool-size-min and"
+ " pool-size-max values.")
.build());
-
- // ==================================================
- // Deprecated options
- // ==================================================
-
- /**
- * The Akka death watch heartbeat interval.
- *
- * @deprecated Don't use this option anymore. It has no effect on Flink.
- */
- @Deprecated
- public static final ConfigOption<String> WATCH_HEARTBEAT_INTERVAL =
- ConfigOptions.key("akka.watch.heartbeat.interval")
- .stringType()
- .defaultValue(ASK_TIMEOUT.defaultValue())
- .withDescription(
- Description.builder()
- .text(
- "Heartbeat interval for Akka’s
DeathWatch mechanism to detect dead TaskManagers. If"
- + " TaskManagers are
wrongly marked dead because of lost or delayed heartbeat messages, then you"
- + " should decrease this
value or increase akka.watch.heartbeat.pause. A thorough description of"
- + " Akka’s DeathWatch can
be found %s",
- link(
-
"https://pekko.apache.org/docs/pekko/current/remoting-artery.html#failure-detector",
- "here"))
- .build());
-
- /**
- * The maximum acceptable Akka death watch heartbeat pause.
- *
- * @deprecated Don't use this option anymore. It has no effect on Flink.
- */
- @Deprecated
- public static final ConfigOption<String> WATCH_HEARTBEAT_PAUSE =
- ConfigOptions.key("akka.watch.heartbeat.pause")
- .stringType()
- .defaultValue("60 s")
- .withDescription(
- Description.builder()
- .text(
- "Acceptable heartbeat pause for
Akka’s DeathWatch mechanism. A low value does not allow an"
- + " irregular heartbeat.
If TaskManagers are wrongly marked dead because of lost or delayed"
- + " heartbeat messages,
then you should increase this value or decrease akka.watch.heartbeat.interval."
- + " Higher value increases
the time to detect a dead TaskManager. A thorough description of Akka’s"
- + " DeathWatch can be
found %s",
- link(
-
"https://pekko.apache.org/docs/pekko/current/remoting-artery.html#failure-detector",
- "here"))
- .build());
-
- /**
- * Detection threshold for the phi accrual watch failure detector.
- *
- * @deprecated Don't use this option anymore. It has no effect on Flink.
- */
- @Deprecated
- public static final ConfigOption<Integer> WATCH_THRESHOLD =
- ConfigOptions.key("akka.watch.threshold")
- .intType()
- .defaultValue(12)
- .withDescription(
- Description.builder()
- .text(
- "Threshold for the DeathWatch
failure detector. A low value is prone to false positives whereas"
- + " a high value increases
the time to detect a dead TaskManager. A thorough description of Akka’s"
- + " DeathWatch can be
found %s",
- link(
-
"https://pekko.apache.org/docs/pekko/current/remoting-artery.html#failure-detector",
- "here"))
- .build());
}
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index f6c64c2f6db..aaeee7e74b4 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -266,15 +266,15 @@ public class TaskManagerOptions {
public static final ConfigOption<Duration> SLOT_TIMEOUT =
key("taskmanager.slot.timeout")
.durationType()
-
.defaultValue(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue())
- .withFallbackKeys(AkkaOptions.ASK_TIMEOUT_DURATION.key())
+
.defaultValue(RpcOptions.ASK_TIMEOUT_DURATION.defaultValue())
+ .withFallbackKeys(RpcOptions.ASK_TIMEOUT_DURATION.key())
.withDescription(
Description.builder()
.text(
"Timeout used for identifying
inactive slots. The TaskManager will free the slot if it does not become active
"
+ "within the given amount
of time. Inactive slots can be caused by an out-dated slot request. If no "
+ "value is configured,
then it will fall back to %s.",
-
code(AkkaOptions.ASK_TIMEOUT_DURATION.key()))
+
code(RpcOptions.ASK_TIMEOUT_DURATION.key()))
.build());
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
index 5149919ab83..86ccdca9226 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.rpc.pekko;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.util.NetUtils;
@@ -264,11 +264,9 @@ public class ActorSystemBootstrapTools {
public static RpcSystem.ForkJoinExecutorConfiguration
getForkJoinExecutorConfiguration(
final Configuration configuration) {
final double parallelismFactor =
-
configuration.get(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
- final int minParallelism =
-
configuration.get(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
- final int maxParallelism =
-
configuration.get(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
+
configuration.get(RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
+ final int minParallelism =
configuration.get(RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
+ final int maxParallelism =
configuration.get(RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
return new RpcSystem.ForkJoinExecutorConfiguration(
parallelismFactor, minParallelism, maxParallelism);
@@ -277,11 +275,11 @@ public class ActorSystemBootstrapTools {
public static RpcSystem.ForkJoinExecutorConfiguration
getRemoteForkJoinExecutorConfiguration(
final Configuration configuration) {
final double parallelismFactor =
-
configuration.get(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
+
configuration.get(RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
final int minParallelism =
-
configuration.get(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
+
configuration.get(RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
final int maxParallelism =
-
configuration.get(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
+
configuration.get(RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
return new RpcSystem.ForkJoinExecutorConfiguration(
parallelismFactor, minParallelism, maxParallelism);
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java
index bc117df44b7..496304dc6aa 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.rpc.pekko;
-import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.Local;
@@ -400,7 +400,7 @@ class PekkoInvocationHandler implements InvocationHandler,
PekkoBasedEndpoint, R
+ "more time for responding, due
to problems like slow machines or network jitters. In that case, you can try to
increase %s.",
rpcInvocation,
recipient,
- AkkaOptions.ASK_TIMEOUT_DURATION.key()));
+ RpcOptions.ASK_TIMEOUT_DURATION.key()));
}
newException.initCause(exception);
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java
index e52c277b37b..ee19ed005df 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java
@@ -17,8 +17,8 @@
package org.apache.flink.runtime.rpc.pekko;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import javax.annotation.Nonnull;
@@ -77,14 +77,14 @@ public class PekkoRpcServiceConfiguration {
}
public static PekkoRpcServiceConfiguration fromConfiguration(Configuration
configuration) {
- final Duration timeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
+ final Duration timeout =
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION);
final long maximumFramesize =
PekkoRpcServiceUtils.extractMaximumFramesize(configuration);
- final boolean captureAskCallStacks =
configuration.get(AkkaOptions.CAPTURE_ASK_CALLSTACK);
+ final boolean captureAskCallStacks =
configuration.get(RpcOptions.CAPTURE_ASK_CALLSTACK);
final boolean forceRpcInvocationSerialization =
-
AkkaOptions.isForceRpcInvocationSerializationEnabled(configuration);
+
RpcOptions.isForceRpcInvocationSerializationEnabled(configuration);
return new PekkoRpcServiceConfiguration(
configuration,
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java
index 4de9914aa1c..c3eb3925e96 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.rpc.pekko;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcService;
@@ -130,7 +130,7 @@ public class PekkoRpcServiceUtils {
checkNotNull(config, "config is null");
final boolean sslEnabled =
- config.get(AkkaOptions.SSL_ENABLED) &&
SecurityOptions.isInternalSSLEnabled(config);
+ config.get(RpcOptions.SSL_ENABLED) &&
SecurityOptions.isInternalSSLEnabled(config);
return getRpcUrl(
hostname,
@@ -232,7 +232,7 @@ public class PekkoRpcServiceUtils {
// ------------------------------------------------------------------------
public static long extractMaximumFramesize(Configuration configuration) {
- String maxFrameSizeStr = configuration.get(AkkaOptions.FRAMESIZE);
+ String maxFrameSizeStr = configuration.get(RpcOptions.FRAMESIZE);
String configStr = String.format(SIMPLE_CONFIG_TEMPLATE,
maxFrameSizeStr);
Config config = ConfigFactory.parseString(configStr);
return config.getBytes(MAXIMUM_FRAME_SIZE_PATH);
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
index 660235e125e..095ae648726 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.rpc.pekko;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.RpcSystem;
@@ -70,11 +70,11 @@ class PekkoUtils {
* @return Flink's basic Pekko config
*/
private static Config getBasicConfig(Configuration configuration) {
- final int throughput =
configuration.get(AkkaOptions.DISPATCHER_THROUGHPUT);
+ final int throughput =
configuration.get(RpcOptions.DISPATCHER_THROUGHPUT);
final String jvmExitOnFatalError =
-
booleanToOnOrOff(configuration.get(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR));
+
booleanToOnOrOff(configuration.get(RpcOptions.JVM_EXIT_ON_FATAL_ERROR));
final String logLifecycleEvents =
-
booleanToOnOrOff(configuration.get(AkkaOptions.LOG_LIFECYCLE_EVENTS));
+
booleanToOnOrOff(configuration.get(RpcOptions.LOG_LIFECYCLE_EVENTS));
final String supervisorStrategy =
EscalatingSupervisorStrategy.class.getCanonicalName();
return new ConfigBuilder()
@@ -206,39 +206,39 @@ class PekkoUtils {
private static void addBaseRemoteConfig(
ConfigBuilder configBuilder, Configuration configuration, int
port, int externalPort) {
- final Duration askTimeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
+ final Duration askTimeout =
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION);
final String startupTimeout =
TimeUtils.getStringInMillis(
TimeUtils.parseDuration(
configuration.get(
- AkkaOptions.STARTUP_TIMEOUT,
+ RpcOptions.STARTUP_TIMEOUT,
TimeUtils.getStringInMillis(
askTimeout.multipliedBy(10L)))));
final String tcpTimeout =
TimeUtils.getStringInMillis(
-
TimeUtils.parseDuration(configuration.get(AkkaOptions.TCP_TIMEOUT)));
+
TimeUtils.parseDuration(configuration.get(RpcOptions.TCP_TIMEOUT)));
- final String framesize = configuration.get(AkkaOptions.FRAMESIZE);
+ final String framesize = configuration.get(RpcOptions.FRAMESIZE);
final int clientSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
+
configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
final int clientSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
+
configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
final double clientSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
+
configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
final int serverSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
+
configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
final int serverSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
+
configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
final double serverSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
+
configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
final String logLifecycleEvents =
-
booleanToOnOrOff(configuration.get(AkkaOptions.LOG_LIFECYCLE_EVENTS));
+
booleanToOnOrOff(configuration.get(RpcOptions.LOG_LIFECYCLE_EVENTS));
- final long retryGateClosedFor =
configuration.get(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+ final long retryGateClosedFor =
configuration.get(RpcOptions.RETRY_GATE_CLOSED_FOR);
configBuilder
.add("pekko {")
@@ -312,7 +312,7 @@ class PekkoUtils {
ConfigBuilder configBuilder, Configuration configuration) {
final boolean enableSSLConfig =
- configuration.get(AkkaOptions.SSL_ENABLED)
+ configuration.get(RpcOptions.SSL_ENABLED)
&& SecurityOptions.isInternalSSLEnabled(configuration);
final String enableSSL = booleanToOnOrOff(enableSSLConfig);
diff --git
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java
index 7ad851f0322..22aeb2f31dd 100644
---
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java
+++
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.rpc.pekko;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -51,7 +51,7 @@ class MessageSerializationTest {
@BeforeAll
static void setup() throws Exception {
Configuration configuration = new Configuration();
- configuration.set(AkkaOptions.FRAMESIZE, maxFrameSize + "b");
+ configuration.set(RpcOptions.FRAMESIZE, maxFrameSize + "b");
rpcService1 =
PekkoRpcServiceUtils.remoteServiceBuilder(configuration,
"localhost", 0)
diff --git
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java
index 9170ec784f6..ae3a9828e25 100644
---
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java
+++
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.rpc.pekko;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -57,8 +57,8 @@ class PekkoRpcActorOversizedResponseMessageTest {
static void setupClass() throws Exception {
final Configuration configuration = new Configuration();
// some tests explicitly test local communication where no
serialization should occur
- configuration.set(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION,
false);
- configuration.set(AkkaOptions.FRAMESIZE, FRAMESIZE + " b");
+ configuration.set(RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION,
false);
+ configuration.set(RpcOptions.FRAMESIZE, FRAMESIZE + " b");
rpcService1 =
PekkoRpcServiceUtils.remoteServiceBuilder(configuration,
"localhost", 0)
diff --git
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
index 78f300cf0c9..553b2cd5589 100644
---
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
+++
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
@@ -17,8 +17,8 @@
package org.apache.flink.runtime.rpc.pekko;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcSystem;
@@ -225,7 +225,7 @@ class PekkoUtilsTest {
@Test
void getConfigDefaultsStartupTimeoutTo10TimesOfAskTimeout() {
final Configuration configuration = new Configuration();
- configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION,
Duration.ofMillis(100));
+ configuration.set(RpcOptions.ASK_TIMEOUT_DURATION,
Duration.ofMillis(100));
final Config config =
PekkoUtils.getConfig(configuration, new
HostAndPort("localhost", 31337));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 7b1a2281d8d..4514f890510 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
@@ -152,7 +152,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory
Configuration configuration, JobType jobType, boolean
isDynamicGraph) {
final Time rpcTimeout =
-
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
final Time slotIdleTimeout =
Time.milliseconds(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT));
final Time batchSlotTimeout =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
index ce30f698034..47c23ee178b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
@@ -19,10 +19,10 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.util.Preconditions;
@@ -75,7 +75,7 @@ public class JobMasterConfiguration {
public static JobMasterConfiguration fromConfiguration(Configuration
configuration) {
final Time rpcTimeout =
-
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
final Time slotRequestTimeout =
Time.milliseconds(configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 9905906f640..61166015d29 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -18,12 +18,12 @@
package org.apache.flink.runtime.minicluster;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.plugin.PluginManager;
@@ -90,8 +90,8 @@ public class MiniClusterConfiguration {
}
// increase the ask.timeout if not set in order to harden tests on
slow CI
- if (!modifiedConfig.contains(AkkaOptions.ASK_TIMEOUT_DURATION)) {
- modifiedConfig.set(AkkaOptions.ASK_TIMEOUT_DURATION,
Duration.ofMinutes(5L));
+ if (!modifiedConfig.contains(RpcOptions.ASK_TIMEOUT_DURATION)) {
+ modifiedConfig.set(RpcOptions.ASK_TIMEOUT_DURATION,
Duration.ofMinutes(5L));
}
return new UnmodifiableConfiguration(modifiedConfig);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
index fc473564b5e..3f28af9f31d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -20,10 +20,10 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.blocklist.BlocklistUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -89,7 +89,7 @@ public final class StandaloneResourceManagerFactory extends
ResourceManagerFacto
fatalErrorHandler,
resourceManagerMetricGroup,
standaloneClusterStartupPeriodTime,
-
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)),
+
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)),
ioExecutor);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
index 40df2011358..1698a6c4e8b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.resourcemanager.active;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
@@ -165,7 +165,7 @@ public class ActiveResourceManager<WorkerType extends
ResourceIDRetrievable>
resourceManagerMetricGroup,
Time.fromDuration(
Preconditions.checkNotNull(flinkConfig)
- .get(AkkaOptions.ASK_TIMEOUT_DURATION)),
+ .get(RpcOptions.ASK_TIMEOUT_DURATION)),
ioExecutor);
this.flinkConfig = flinkConfig;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index b5d9734b128..66181662277 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -20,10 +20,10 @@ package
org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.util.ConfigurationException;
@@ -238,7 +238,7 @@ public class SlotManagerConfiguration {
throws ConfigurationException {
final Time rpcTimeout =
-
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
final Time taskManagerTimeout =
Time.milliseconds(configuration.get(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 3d437b04e1f..34959bcc3f0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.taskexecutor;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -193,7 +193,7 @@ public class TaskManagerConfiguration implements
TaskManagerRuntimeInfo {
final String[] tmpDirPaths =
ConfigurationUtils.parseTempDirectories(configuration);
- final Duration rpcTimeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
+ final Duration rpcTimeout =
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION);
LOG.debug("Messages have a max timeout of " + rpcTimeout);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 726a414850e..26d3a290c78 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JMXServerOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
import org.apache.flink.core.fs.FileSystem;
@@ -171,7 +171,7 @@ public class TaskManagerRunner implements FatalErrorHandler
{
this.pluginManager = checkNotNull(pluginManager);
this.taskExecutorServiceFactory =
checkNotNull(taskExecutorServiceFactory);
- timeout =
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+ timeout =
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;
@@ -725,7 +725,7 @@ public class TaskManagerRunner implements FatalErrorHandler
{
RpcSystemUtils rpcSystemUtils)
throws LeaderRetrievalException {
- final Duration lookupTimeout =
configuration.get(AkkaOptions.LOOKUP_TIMEOUT_DURATION);
+ final Duration lookupTimeout =
configuration.get(RpcOptions.LOOKUP_TIMEOUT_DURATION);
final InetAddress taskManagerAddress =
LeaderRetrievalUtils.findConnectingAddress(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 48b0e08dd7a..fbd5a456747 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -19,13 +19,13 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -289,7 +289,7 @@ public class TaskManagerServicesConfiguration {
QueryableStateConfiguration.fromConfiguration(configuration);
long timerServiceShutdownTimeout =
- configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis();
+ configuration.get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis();
final RetryingRegistrationConfiguration
retryingRegistrationConfiguration =
RetryingRegistrationConfiguration.fromConfiguration(configuration);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index af658994f3c..e1d6c16778c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -267,7 +267,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
}
private VertexThreadInfoTracker
initializeThreadInfoTracker(ScheduledExecutorService executor) {
- final Duration askTimeout =
clusterConfiguration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
+ final Duration askTimeout =
clusterConfiguration.get(RpcOptions.ASK_TIMEOUT_DURATION);
final Duration flameGraphCleanUpInterval =
clusterConfiguration.get(RestOptions.FLAMEGRAPH_CLEANUP_INTERVAL);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index 4d2156701d6..58e38d62d45 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
@@ -60,7 +60,7 @@ public class TestingDefaultExecutionGraphBuilder {
return new TestingDefaultExecutionGraphBuilder();
}
- private Time rpcTimeout =
Time.fromDuration(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
+ private Time rpcTimeout =
Time.fromDuration(RpcOptions.ASK_TIMEOUT_DURATION.defaultValue());
private ClassLoader userClassLoader =
DefaultExecutionGraph.class.getClassLoader();
private BlobWriter blobWriter = VoidBlobWriter.getInstance();
private ShuffleMaster<?> shuffleMaster =
ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 7514e04e4cb..f7158a39da1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.io.network.partition;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -62,7 +62,7 @@ class PartialConsumePipelinedResultTest {
private static Configuration getFlinkConfiguration() {
final Configuration config = new Configuration();
- config.set(AkkaOptions.ASK_TIMEOUT_DURATION,
TestingUtils.DEFAULT_ASK_TIMEOUT);
+ config.set(RpcOptions.ASK_TIMEOUT_DURATION,
TestingUtils.DEFAULT_ASK_TIMEOUT);
config.set(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS,
NUMBER_OF_NETWORK_BUFFERS);
return config;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index e5e4c896b56..53822036e33 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.jobmanager;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
@@ -68,7 +68,7 @@ public class SlotCountExceedingParallelismTest extends
TestLogger {
private static Configuration getFlinkConfiguration() {
final Configuration config = new Configuration();
- config.set(AkkaOptions.ASK_TIMEOUT_DURATION,
TestingUtils.DEFAULT_ASK_TIMEOUT);
+ config.set(RpcOptions.ASK_TIMEOUT_DURATION,
TestingUtils.DEFAULT_ASK_TIMEOUT);
return config;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
index 67bc11a4266..8cdcb3e610e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.blocklist.BlocklistUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -101,7 +101,7 @@ public class TestingResourceManagerFactory extends
ResourceManagerFactory<Resour
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup,
-
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)),
+
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)),
ioExecutor);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index c9863579819..4a6a760636c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.rpc;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -41,7 +41,7 @@ class RpcConnectionTest {
// we start the RPC service with a very long timeout to ensure that
the test
// can only pass if the connection problem is not recognized merely
via a timeout
Configuration configuration = new Configuration();
- configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION,
Duration.ofSeconds(10000000));
+ configuration.set(RpcOptions.ASK_TIMEOUT_DURATION,
Duration.ofSeconds(10000000));
try (RpcSystem rpcSystem = RpcSystem.load()) {
final RpcService rpcService =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
index c6099b169fb..7828cc366e1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.rpc;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -50,10 +50,10 @@ class RpcSSLAuthITCase {
@Test
void testConnectFailure() throws Exception {
final Configuration baseConfig = new Configuration();
- baseConfig.set(AkkaOptions.TCP_TIMEOUT, "1 s");
+ baseConfig.set(RpcOptions.TCP_TIMEOUT, "1 s");
// we start the RPC service with a very long timeout to ensure that
the test
// can only pass if the connection problem is not recognized merely
via a timeout
- baseConfig.set(AkkaOptions.ASK_TIMEOUT_DURATION,
Duration.ofSeconds(10000000));
+ baseConfig.set(RpcOptions.ASK_TIMEOUT_DURATION,
Duration.ofSeconds(10000000));
// !!! This config has KEY_STORE_FILE / TRUST_STORE_FILE !!!
Configuration sslConfig1 = new Configuration(baseConfig);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
index f30ac260f4f..8d7847a822f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
@@ -18,11 +18,11 @@
package org.apache.flink.runtime.taskexecutor;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
import org.apache.flink.configuration.UnmodifiableConfiguration;
@@ -259,7 +259,7 @@ class TaskManagerRunnerConfigurationTest {
final Configuration config = new Configuration();
config.set(TaskManagerOptions.HOST_BIND_POLICY, bindPolicy.toString());
config.set(JobManagerOptions.ADDRESS, "localhost");
- config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMillis(10));
+ config.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMillis(10));
return new UnmodifiableConfiguration(config);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
index 7aec05cbeb3..22ec9dbe70e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
@@ -19,9 +19,9 @@
package org.apache.flink.runtime.testutils;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -91,7 +91,7 @@ public class MiniClusterResourceConfiguration {
private int numberTaskManagers = 1;
private int numberSlotsPerTaskManager = 1;
private Time shutdownTimeout =
-
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
private MiniCluster.HaServices haServices =
MiniCluster.HaServices.CONFIGURED;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index d1f9f0324fe..86662095af4 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.testutils;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -121,7 +121,7 @@ public class ZooKeeperTestUtils {
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
fsStateHandlePath + "/checkpoints");
config.set(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath
+ "/recovery");
- config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
+ config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
return config;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index acdd07735b5..179a3cb13f2 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -51,7 +51,7 @@ import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
@@ -1443,7 +1443,7 @@ public class DataStream<T> {
new CollectSinkOperatorFactory<>(serializer, accumulatorName);
CollectSinkOperator<T> operator = (CollectSinkOperator<T>)
factory.getOperator();
long resultFetchTimeout =
-
env.getConfiguration().get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis();
+
env.getConfiguration().get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis();
CollectResultIterator<T> iterator =
new CollectResultIterator<>(
operator.getOperatorIdFuture(),
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
index f1562be0135..a6dd08b8f39 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.operators.collect;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.CheckpointingMode;
@@ -88,7 +88,7 @@ public class CollectResultIterator<T> implements
CloseableIterator<T> {
operatorIdFuture,
accumulatorName,
retryMillis,
-
AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
+
RpcOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
this.bufferedResult = null;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
index 0a45f019dd9..4ce00d58bff 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.planner.connectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -130,7 +130,7 @@ public final class CollectDynamicSink implements
DynamicTableSink {
inputStream
.getExecutionEnvironment()
.getConfiguration()
- .get(AkkaOptions.ASK_TIMEOUT_DURATION)
+ .get(RpcOptions.ASK_TIMEOUT_DURATION)
.toMillis();
iterator =
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
index f38181badfc..d51337f5002 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
@@ -20,10 +20,10 @@ package
org.apache.flink.table.planner.plan.nodes.exec.batch;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
@@ -61,7 +61,7 @@ public class BatchExecDynamicFilteringDataCollector extends
ExecNodeBase<Object>
"If the collector collects more data than the
threshold (default is 8M), "
+ "an empty DynamicFilterEvent with a flag
only will be sent to Coordinator, "
+ "which could avoid exceeding the pekko
limit and out-of-memory (see "
- + AkkaOptions.FRAMESIZE.key()
+ + RpcOptions.FRAMESIZE.key()
+ "). Otherwise a DynamicFilterEvent with
all deduplicated records will be sent to Coordinator.");
private final List<Integer> dynamicFilteringFieldIndices;
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
index 793e5b0bf9c..2fe57682f2c 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
@@ -26,8 +26,8 @@ import
org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import
org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
@@ -614,7 +614,7 @@ public abstract class SinkTestSuiteBase<T extends
Comparable<T>> {
serializer,
accumulatorName,
stream.getExecutionEnvironment().getCheckpointConfig(),
- AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
+ RpcOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
}
private void waitExpectedSizeData(CollectResultIterator<T> iterator, int
targetNum) {
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
index 3efed9d4c16..7f5f0d3013a 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
@@ -26,8 +26,8 @@ import
org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.connector.testframe.environment.ClusterControllable;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import
org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
@@ -795,7 +795,7 @@ public abstract class SourceTestSuiteBase<T> {
serializer,
accumulatorName,
checkpointConfig,
-
AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
+
RpcOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis());
iterator.setJobClient(jobClient);
return iterator;
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index a9087f9b249..96219147c30 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -27,9 +27,9 @@ import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.optimizer.DataStatistics;
@@ -102,7 +102,7 @@ public class AccumulatorLiveITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
- config.set(AkkaOptions.ASK_TIMEOUT_DURATION,
TestingUtils.DEFAULT_ASK_TIMEOUT);
+ config.set(RpcOptions.ASK_TIMEOUT_DURATION,
TestingUtils.DEFAULT_ASK_TIMEOUT);
config.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL,
HEARTBEAT_INTERVAL);
return config;
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index cd9206c781e..5bcd17b9826 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.Plan;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
@@ -85,7 +85,7 @@ public abstract class CancelingTestBase extends TestLogger {
verifyJvmOptions();
Configuration config = new Configuration();
config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
- config.set(AkkaOptions.ASK_TIMEOUT_DURATION,
TestingUtils.DEFAULT_ASK_TIMEOUT);
+ config.set(RpcOptions.ASK_TIMEOUT_DURATION,
TestingUtils.DEFAULT_ASK_TIMEOUT);
config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE,
MemorySize.parse("4096"));
config.set(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 2048);
@@ -99,7 +99,7 @@ public abstract class CancelingTestBase extends TestLogger {
// submit job
final JobGraph jobGraph = getJobGraph(plan);
- final long rpcTimeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis();
+ final long rpcTimeout =
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis();
ClusterClient<?> client = CLUSTER.getClusterClient();
JobID jobID = client.submitJob(jobGraph).get();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index dae5e0aef99..0fe64cf0377 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -23,9 +23,9 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -72,8 +72,8 @@ public class EventTimeAllWindowCheckpointingITCase extends
TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE,
MemorySize.parse("48m"));
- config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMinutes(1));
- config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
+ config.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMinutes(1));
+ config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
return config;
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index d82b77b63cd..6405546d3d4 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -27,11 +27,11 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
@@ -236,7 +236,7 @@ public class EventTimeWindowCheckpointingITCase extends
TestLogger {
final File haDir = temporaryFolder.newFolder();
Configuration config = new Configuration();
- config.set(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) +
"b");
+ config.set(RpcOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) +
"b");
if (zkServer != null) {
config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 4218be61e60..80084bc3fbc 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -42,11 +42,11 @@ import
org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.InputStatus;
@@ -785,7 +785,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
// amount of buffers
conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN,
MemorySize.ofMebiBytes(32));
conf.set(TaskManagerOptions.NETWORK_MEMORY_MAX,
MemorySize.ofMebiBytes(32));
- conf.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
+ conf.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
return conf;
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 1c484d3213a..c1a8fe9e95a 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -24,10 +24,10 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.SavepointFormatType;
@@ -136,7 +136,7 @@ public class ClassLoaderITCase extends TestLogger {
// some tests check for serialization problems related to class-loading
// this requires all RPCs to actually go through serialization
- config.set(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION, true);
+ config.set(RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION, true);
miniClusterResource =
new MiniClusterResource(
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index 993f9eaa0ce..a911f1a0fd4 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -46,7 +46,7 @@ public class StreamingCustomInputSplitProgram {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
- config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(5));
+ config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(5));
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index 5e97398f775..17b73c9c847 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -22,8 +22,8 @@ import
org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -60,7 +60,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
@Test
public void testUserSpecificParallelism() throws Exception {
Configuration config = new Configuration();
- config.set(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
+ config.set(RpcOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
final URI restAddress = MINI_CLUSTER_RESOURCE.getRestAddress();
final String hostname = restAddress.getHost();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 165e8326f16..246399b1a26 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -23,12 +23,12 @@ import
org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.EachCallbackWrapper;
@@ -110,7 +110,7 @@ class ProcessFailureCancelingITCase {
Configuration config = new Configuration();
config.set(JobManagerOptions.ADDRESS, "localhost");
- config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
+ config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
config.set(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.set(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java
index e2c575387d5..a188c7b4379 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java
@@ -19,13 +19,13 @@
package org.apache.flink.test.recovery;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blocklist.BlocklistUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -207,7 +207,7 @@ public class TaskManagerDisconnectOnShutdownITCase {
fatalErrorHandler,
resourceManagerMetricGroup,
standaloneClusterStartupPeriodTime,
-
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)),
+
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)),
ioExecutor) {
@Override
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java
index b59f1614baa..0fe8f4d5f7c 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java
@@ -18,10 +18,10 @@
package org.apache.flink.test.recovery;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
@@ -62,7 +62,7 @@ public class TaskManagerRunnerITCase extends TestLogger {
ClusterOptions.PROCESS_WORKING_DIR_BASE,
workingDirBase.getAbsolutePath());
configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID,
resourceId.toString());
configuration.set(JobManagerOptions.ADDRESS, "localhost");
- configuration.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO);
+ configuration.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO);
final File workingDirectory =
ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile(
@@ -99,7 +99,7 @@ public class TaskManagerRunnerITCase extends TestLogger {
configuration.set(
ClusterOptions.PROCESS_WORKING_DIR_BASE,
workingDirBase.getAbsolutePath());
configuration.set(JobManagerOptions.ADDRESS, "localhost");
- configuration.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO);
+ configuration.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO);
final TestProcessBuilder.TestProcess taskManagerProcess =
new
TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName())
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index 9a092c8147a..99965735b4c 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -20,9 +20,9 @@ package org.apache.flink.test.runtime;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -85,7 +85,7 @@ public class ShuffleCompressionITCase {
public void testNoDataCompressionForBoundedBlockingShuffle() throws
Exception {
Configuration configuration = new Configuration();
configuration.set(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED,
false);
- configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION,
Duration.ofMinutes(1));
+ configuration.set(RpcOptions.ASK_TIMEOUT_DURATION,
Duration.ofMinutes(1));
configuration.set(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
Integer.MAX_VALUE);
@@ -98,7 +98,7 @@ public class ShuffleCompressionITCase {
public void testNoDataCompressionForSortMergeBlockingShuffle() throws
Exception {
Configuration configuration = new Configuration();
configuration.set(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED,
false);
- configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION,
Duration.ofMinutes(1));
+ configuration.set(RpcOptions.ASK_TIMEOUT_DURATION,
Duration.ofMinutes(1));
JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING,
ExecutionMode.BATCH);
JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS,
NUM_SLOTS);
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
index de4cb0c5b03..05f5d25638f 100644
---
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
+++
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
@@ -21,12 +21,12 @@ package org.apache.flink.yarn;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
@@ -131,7 +131,7 @@ class YARNApplicationITCase extends YarnTestBase {
Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(768));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.parse("1g"));
- configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION,
Duration.ofSeconds(30));
+ configuration.set(RpcOptions.ASK_TIMEOUT_DURATION,
Duration.ofSeconds(30));
configuration.set(DeploymentOptions.TARGET,
YarnDeploymentTarget.APPLICATION.getName());
configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion);
configuration.set(PipelineOptions.JARS,
Collections.singletonList(userJar.toString()));
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
index 61bb1f50421..84975185d3b 100644
---
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
+++
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
@@ -20,10 +20,10 @@ package org.apache.flink.yarn;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
@@ -152,7 +152,7 @@ class YARNFileReplicationITCase extends YarnTestBase {
final Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(768));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.parse("1g"));
- configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION,
Duration.ofSeconds(30));
+ configuration.set(RpcOptions.ASK_TIMEOUT_DURATION,
Duration.ofSeconds(30));
configuration.set(CLASSPATH_INCLUDE_USER_JAR,
YarnConfigOptions.UserJarInclusion.DISABLED);
return configuration;
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 551f355adea..4a1963ced5e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -21,10 +21,10 @@ package org.apache.flink.yarn;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
@@ -214,7 +214,7 @@ class YARNITCase extends YarnTestBase {
Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(768));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.parse("1g"));
- configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION,
Duration.ofSeconds(30));
+ configuration.set(RpcOptions.ASK_TIMEOUT_DURATION,
Duration.ofSeconds(30));
configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion);
return configuration;
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index ab8986adccf..48efd848e07 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -19,8 +19,8 @@
package org.apache.flink.yarn;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
@@ -118,7 +118,7 @@ public class YarnTaskExecutorRunner {
LOG.info("TM: keytab principal obtained {}", keytabPrincipal);
// tell pekko to die in case of an error
- configuration.set(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+ configuration.set(RpcOptions.JVM_EXIT_ON_FATAL_ERROR, true);
String keytabPath = Utils.resolveKeytabPath(currDir, localKeytabPath);
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index b183fd2604e..f596e476a6f 100644
---
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -27,13 +27,13 @@ import
org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.util.ConfigurationException;
@@ -100,7 +100,7 @@ class FlinkYarnSessionCliTest {
"-j",
"fake.jar",
"-D",
- AkkaOptions.ASK_TIMEOUT_DURATION.key() + "=5 min",
+ RpcOptions.ASK_TIMEOUT_DURATION.key() + "=5 min",
"-D",
CoreOptions.FLINK_JVM_OPTIONS.key() +
"=-DappName=foobar",
"-D",
@@ -108,7 +108,7 @@ class FlinkYarnSessionCliTest {
});
Configuration executorConfig = cli.toConfiguration(cmd);
-
assertThat(executorConfig.get(AkkaOptions.ASK_TIMEOUT_DURATION)).hasMinutes(5);
+
assertThat(executorConfig.get(RpcOptions.ASK_TIMEOUT_DURATION)).hasMinutes(5);
assertThat(executorConfig.get(CoreOptions.FLINK_JVM_OPTIONS)).isEqualTo("-DappName=foobar");
assertThat(executorConfig.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD))
.isEqualTo("changeit");