ferenc-csaky commented on code in PR #22271:
URL: https://github.com/apache/flink/pull/22271#discussion_r1149122919
##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -206,79 +203,30 @@ private static void addBaseRemoteAkkaConfig(
Configuration configuration,
int port,
int externalPort) {
- final Duration akkaAskTimeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
-
- final String startupTimeout =
- TimeUtils.getStringInMillis(
- TimeUtils.parseDuration(
- configuration.getString(
- AkkaOptions.STARTUP_TIMEOUT,
- TimeUtils.getStringInMillis(
-
akkaAskTimeout.multipliedBy(10L)))));
-
final String akkaTCPTimeout =
TimeUtils.getStringInMillis(
TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)));
final String akkaFramesize =
configuration.getString(AkkaOptions.FRAMESIZE);
- final int clientSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
- final int clientSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
- final double clientSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
- final int serverSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
- final int serverSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
- final double serverSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
-
- final String logLifecycleEvents =
-
booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
-
- final long retryGateClosedFor =
configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+ final String outboundRestartBackoff =
+ configuration.getString(AkkaOptions.OUTBOUND_RESTART_BACKOFF);
akkaConfigBuilder
.add("akka {")
.add(" actor {")
- .add(" provider = \"akka.remote.RemoteActorRefProvider\"")
+ .add(" provider = remote")
.add(" }")
- .add(" remote.artery.enabled = false")
- .add(" remote.startup-timeout = " + startupTimeout)
.add(" remote.warn-about-direct-use = off")
.add(" remote.use-unsafe-remote-features-outside-cluster =
on")
- .add(" remote.classic {")
- .add(" # disable the transport failure detector by setting
very high values")
- .add(" transport-failure-detector{")
- .add(" acceptable-heartbeat-pause = 6000 s")
- .add(" heartbeat-interval = 1000 s")
- .add(" threshold = 300")
+ .add(" remote.artery {")
+ .add(" canonical.port = " + externalPort)
+ .add(" bind.port = " + port)
+ .add(" advanced {")
+ .add(" maximum-frame-size = " + akkaFramesize)
+ .add(" outbound-restart-backoff = " +
outboundRestartBackoff)
+ .add(" tcp.connection-timeout = " + akkaTCPTimeout)
.add(" }")
- .add(" enabled-transports =
[\"akka.remote.classic.netty.tcp\"]")
- .add(" netty {")
- .add(" tcp {")
- .add(" transport-class =
\"akka.remote.transport.netty.NettyTransport\"")
- .add(" port = " + externalPort)
- .add(" bind-port = " + port)
- .add(" connection-timeout = " + akkaTCPTimeout)
- .add(" maximum-frame-size = " + akkaFramesize)
- .add(" tcp-nodelay = on")
- .add(" client-socket-worker-pool {")
Review Comment:
I do not have a definitive answer at this point, but I guess yeah. The docs
are not too specific, so I quite quickly ended up checking all the
[configuation
options](https://doc.akka.io/docs/akka/2.6/general/configuration-reference.html#akka-remote-artery),
which do not provide additional thread pool config for Artery.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]