ferenc-csaky commented on code in PR #22271:
URL: https://github.com/apache/flink/pull/22271#discussion_r1149150412
##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -206,79 +203,30 @@ private static void addBaseRemoteAkkaConfig(
Configuration configuration,
int port,
int externalPort) {
- final Duration akkaAskTimeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
-
- final String startupTimeout =
- TimeUtils.getStringInMillis(
- TimeUtils.parseDuration(
- configuration.getString(
- AkkaOptions.STARTUP_TIMEOUT,
- TimeUtils.getStringInMillis(
-
akkaAskTimeout.multipliedBy(10L)))));
-
final String akkaTCPTimeout =
TimeUtils.getStringInMillis(
TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)));
final String akkaFramesize =
configuration.getString(AkkaOptions.FRAMESIZE);
- final int clientSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
- final int clientSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
- final double clientSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
- final int serverSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
- final int serverSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
- final double serverSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
-
- final String logLifecycleEvents =
-
booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
-
- final long retryGateClosedFor =
configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+ final String outboundRestartBackoff =
+ configuration.getString(AkkaOptions.OUTBOUND_RESTART_BACKOFF);
akkaConfigBuilder
.add("akka {")
.add(" actor {")
- .add(" provider = \"akka.remote.RemoteActorRefProvider\"")
+ .add(" provider = remote")
.add(" }")
- .add(" remote.artery.enabled = false")
- .add(" remote.startup-timeout = " + startupTimeout)
.add(" remote.warn-about-direct-use = off")
.add(" remote.use-unsafe-remote-features-outside-cluster =
on")
- .add(" remote.classic {")
- .add(" # disable the transport failure detector by setting
very high values")
- .add(" transport-failure-detector{")
- .add(" acceptable-heartbeat-pause = 6000 s")
- .add(" heartbeat-interval = 1000 s")
- .add(" threshold = 300")
+ .add(" remote.artery {")
+ .add(" canonical.port = " + externalPort)
+ .add(" bind.port = " + port)
+ .add(" advanced {")
+ .add(" maximum-frame-size = " + akkaFramesize)
+ .add(" outbound-restart-backoff = " +
outboundRestartBackoff)
+ .add(" tcp.connection-timeout = " + akkaTCPTimeout)
.add(" }")
- .add(" enabled-transports =
[\"akka.remote.classic.netty.tcp\"]")
- .add(" netty {")
- .add(" tcp {")
- .add(" transport-class =
\"akka.remote.transport.netty.NettyTransport\"")
- .add(" port = " + externalPort)
- .add(" bind-port = " + port)
- .add(" connection-timeout = " + akkaTCPTimeout)
- .add(" maximum-frame-size = " + akkaFramesize)
- .add(" tcp-nodelay = on")
- .add(" client-socket-worker-pool {")
- .add(" pool-size-min = " +
clientSocketWorkerPoolPoolSizeMin)
- .add(" pool-size-max = " +
clientSocketWorkerPoolPoolSizeMax)
- .add(" pool-size-factor = " +
clientSocketWorkerPoolPoolSizeFactor)
- .add(" }")
- .add(" server-socket-worker-pool {")
- .add(" pool-size-min = " +
serverSocketWorkerPoolPoolSizeMin)
- .add(" pool-size-max = " +
serverSocketWorkerPoolPoolSizeMax)
- .add(" pool-size-factor = " +
serverSocketWorkerPoolPoolSizeFactor)
- .add(" }")
- .add(" }")
- .add(" }")
- .add(" log-remote-lifecycle-events = " + logLifecycleEvents)
Review Comment:
Does not exist in Artery. What we can control in artery are these options:
```yaml
# If this is "on", all inbound remote messages will be logged at DEBUG
level,
# if off then they are not logged
log-received-messages = off
# If this is "on", all outbound remote messages will be logged at
DEBUG level,
# if off then they are not logged
log-sent-messages = off
# Logging of message types with payload size in bytes larger than
# this value. Maximum detected size per message type is logged once,
# with an increase threshold of 10%.
# By default this feature is turned off. Activate it by setting the
property to
# a value in bytes, such as 1000b. Note that for all messages larger
than this
# limit there will be extra performance and scalability cost.
log-frame-size-exceeding = off
```
I did not added this yet, because there are multiple options here IMO:
1. Keep the old config option in Flink and apply that to both sent and
received.
2. Deprecate the currently existing log option and add 2 for both received
and sent.
We can also consider what to do with the frame size exceeding events.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]