zentol commented on code in PR #22271:
URL: https://github.com/apache/flink/pull/22271#discussion_r1150252967
##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -206,79 +203,30 @@ private static void addBaseRemoteAkkaConfig(
Configuration configuration,
int port,
int externalPort) {
- final Duration akkaAskTimeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
-
- final String startupTimeout =
- TimeUtils.getStringInMillis(
- TimeUtils.parseDuration(
- configuration.getString(
- AkkaOptions.STARTUP_TIMEOUT,
- TimeUtils.getStringInMillis(
-
akkaAskTimeout.multipliedBy(10L)))));
-
final String akkaTCPTimeout =
TimeUtils.getStringInMillis(
TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)));
final String akkaFramesize =
configuration.getString(AkkaOptions.FRAMESIZE);
- final int clientSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
- final int clientSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
- final double clientSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
- final int serverSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
- final int serverSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
- final double serverSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
-
- final String logLifecycleEvents =
-
booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
-
- final long retryGateClosedFor =
configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+ final String outboundRestartBackoff =
+ configuration.getString(AkkaOptions.OUTBOUND_RESTART_BACKOFF);
akkaConfigBuilder
.add("akka {")
.add(" actor {")
- .add(" provider = \"akka.remote.RemoteActorRefProvider\"")
+ .add(" provider = remote")
.add(" }")
- .add(" remote.artery.enabled = false")
- .add(" remote.startup-timeout = " + startupTimeout)
.add(" remote.warn-about-direct-use = off")
.add(" remote.use-unsafe-remote-features-outside-cluster =
on")
- .add(" remote.classic {")
- .add(" # disable the transport failure detector by setting
very high values")
- .add(" transport-failure-detector{")
- .add(" acceptable-heartbeat-pause = 6000 s")
- .add(" heartbeat-interval = 1000 s")
- .add(" threshold = 300")
+ .add(" remote.artery {")
+ .add(" canonical.port = " + externalPort)
+ .add(" bind.port = " + port)
+ .add(" advanced {")
+ .add(" maximum-frame-size = " + akkaFramesize)
+ .add(" outbound-restart-backoff = " +
outboundRestartBackoff)
+ .add(" tcp.connection-timeout = " + akkaTCPTimeout)
.add(" }")
- .add(" enabled-transports =
[\"akka.remote.classic.netty.tcp\"]")
- .add(" netty {")
- .add(" tcp {")
- .add(" transport-class =
\"akka.remote.transport.netty.NettyTransport\"")
- .add(" port = " + externalPort)
- .add(" bind-port = " + port)
- .add(" connection-timeout = " + akkaTCPTimeout)
- .add(" maximum-frame-size = " + akkaFramesize)
- .add(" tcp-nodelay = on")
- .add(" client-socket-worker-pool {")
Review Comment:
ok.
Then I assume they use one of Akkas thread pool; would be good to know which
one that is in case we need to make it configurable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]