ferenc-csaky commented on code in PR #22271:
URL: https://github.com/apache/flink/pull/22271#discussion_r1149150412


##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -206,79 +203,30 @@ private static void addBaseRemoteAkkaConfig(
             Configuration configuration,
             int port,
             int externalPort) {
-        final Duration akkaAskTimeout = 
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
-
-        final String startupTimeout =
-                TimeUtils.getStringInMillis(
-                        TimeUtils.parseDuration(
-                                configuration.getString(
-                                        AkkaOptions.STARTUP_TIMEOUT,
-                                        TimeUtils.getStringInMillis(
-                                                
akkaAskTimeout.multipliedBy(10L)))));
-
         final String akkaTCPTimeout =
                 TimeUtils.getStringInMillis(
                         
TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)));
 
         final String akkaFramesize = 
configuration.getString(AkkaOptions.FRAMESIZE);
 
-        final int clientSocketWorkerPoolPoolSizeMin =
-                
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
-        final int clientSocketWorkerPoolPoolSizeMax =
-                
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
-        final double clientSocketWorkerPoolPoolSizeFactor =
-                
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
-        final int serverSocketWorkerPoolPoolSizeMin =
-                
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
-        final int serverSocketWorkerPoolPoolSizeMax =
-                
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
-        final double serverSocketWorkerPoolPoolSizeFactor =
-                
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
-
-        final String logLifecycleEvents =
-                
booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
-
-        final long retryGateClosedFor = 
configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+        final String outboundRestartBackoff =
+                configuration.getString(AkkaOptions.OUTBOUND_RESTART_BACKOFF);
 
         akkaConfigBuilder
                 .add("akka {")
                 .add("  actor {")
-                .add("    provider = \"akka.remote.RemoteActorRefProvider\"")
+                .add("    provider = remote")
                 .add("  }")
-                .add("  remote.artery.enabled = false")
-                .add("  remote.startup-timeout = " + startupTimeout)
                 .add("  remote.warn-about-direct-use = off")
                 .add("  remote.use-unsafe-remote-features-outside-cluster = 
on")
-                .add("  remote.classic {")
-                .add("    # disable the transport failure detector by setting 
very high values")
-                .add("    transport-failure-detector{")
-                .add("      acceptable-heartbeat-pause = 6000 s")
-                .add("      heartbeat-interval = 1000 s")
-                .add("      threshold = 300")
+                .add("  remote.artery {")
+                .add("    canonical.port = " + externalPort)
+                .add("    bind.port = " + port)
+                .add("    advanced {")
+                .add("      maximum-frame-size = " + akkaFramesize)
+                .add("      outbound-restart-backoff = " + 
outboundRestartBackoff)
+                .add("      tcp.connection-timeout = " + akkaTCPTimeout)
                 .add("    }")
-                .add("    enabled-transports = 
[\"akka.remote.classic.netty.tcp\"]")
-                .add("    netty {")
-                .add("      tcp {")
-                .add("        transport-class = 
\"akka.remote.transport.netty.NettyTransport\"")
-                .add("        port = " + externalPort)
-                .add("        bind-port = " + port)
-                .add("        connection-timeout = " + akkaTCPTimeout)
-                .add("        maximum-frame-size = " + akkaFramesize)
-                .add("        tcp-nodelay = on")
-                .add("        client-socket-worker-pool {")
-                .add("          pool-size-min = " + 
clientSocketWorkerPoolPoolSizeMin)
-                .add("          pool-size-max = " + 
clientSocketWorkerPoolPoolSizeMax)
-                .add("          pool-size-factor = " + 
clientSocketWorkerPoolPoolSizeFactor)
-                .add("        }")
-                .add("        server-socket-worker-pool {")
-                .add("          pool-size-min = " + 
serverSocketWorkerPoolPoolSizeMin)
-                .add("          pool-size-max = " + 
serverSocketWorkerPoolPoolSizeMax)
-                .add("          pool-size-factor = " + 
serverSocketWorkerPoolPoolSizeFactor)
-                .add("        }")
-                .add("      }")
-                .add("    }")
-                .add("    log-remote-lifecycle-events = " + logLifecycleEvents)

Review Comment:
   Does not exist in Artery. What we can control in artery are these options:
   ```yaml
         # If this is "on", all inbound remote messages will be logged at DEBUG 
level,
         # if off then they are not logged
         log-received-messages = off
   
         # If this is "on", all outbound remote messages will be logged at 
DEBUG level,
         # if off then they are not logged
         log-sent-messages = off
   
         # Logging of message types with payload size in bytes larger than
         # this value. Maximum detected size per message type is logged once,
         # with an increase threshold of 10%.
         # By default this feature is turned off. Activate it by setting the 
property to
         # a value in bytes, such as 1000b. Note that for all messages larger 
than this
         # limit there will be extra performance and scalability cost.
         log-frame-size-exceeding = off
   ```
   
   I did not added this yet, because there are multiple options here IMO:
   1. Keep the old config option in Flink and apply that to both sent and 
received.
   2. Deprecate the currently existing log option and add 2 for both received 
and sent.
   
   We can also consider what to do with the frame size exceeding events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to