zentol commented on code in PR #22271:
URL: https://github.com/apache/flink/pull/22271#discussion_r1150251774


##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -206,79 +203,30 @@ private static void addBaseRemoteAkkaConfig(
             Configuration configuration,
             int port,
             int externalPort) {
-        final Duration akkaAskTimeout = 
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
-
-        final String startupTimeout =
-                TimeUtils.getStringInMillis(
-                        TimeUtils.parseDuration(
-                                configuration.getString(
-                                        AkkaOptions.STARTUP_TIMEOUT,
-                                        TimeUtils.getStringInMillis(
-                                                
akkaAskTimeout.multipliedBy(10L)))));
-
         final String akkaTCPTimeout =
                 TimeUtils.getStringInMillis(
                         
TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)));
 
         final String akkaFramesize = 
configuration.getString(AkkaOptions.FRAMESIZE);
 
-        final int clientSocketWorkerPoolPoolSizeMin =
-                
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
-        final int clientSocketWorkerPoolPoolSizeMax =
-                
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
-        final double clientSocketWorkerPoolPoolSizeFactor =
-                
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
-        final int serverSocketWorkerPoolPoolSizeMin =
-                
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
-        final int serverSocketWorkerPoolPoolSizeMax =
-                
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
-        final double serverSocketWorkerPoolPoolSizeFactor =
-                
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
-
-        final String logLifecycleEvents =
-                
booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
-
-        final long retryGateClosedFor = 
configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+        final String outboundRestartBackoff =
+                configuration.getString(AkkaOptions.OUTBOUND_RESTART_BACKOFF);
 
         akkaConfigBuilder
                 .add("akka {")
                 .add("  actor {")
-                .add("    provider = \"akka.remote.RemoteActorRefProvider\"")
+                .add("    provider = remote")
                 .add("  }")
-                .add("  remote.artery.enabled = false")
-                .add("  remote.startup-timeout = " + startupTimeout)
                 .add("  remote.warn-about-direct-use = off")
                 .add("  remote.use-unsafe-remote-features-outside-cluster = 
on")
-                .add("  remote.classic {")
-                .add("    # disable the transport failure detector by setting 
very high values")
-                .add("    transport-failure-detector{")
-                .add("      acceptable-heartbeat-pause = 6000 s")
-                .add("      heartbeat-interval = 1000 s")
-                .add("      threshold = 300")
+                .add("  remote.artery {")
+                .add("    canonical.port = " + externalPort)
+                .add("    bind.port = " + port)
+                .add("    advanced {")
+                .add("      maximum-frame-size = " + akkaFramesize)
+                .add("      outbound-restart-backoff = " + 
outboundRestartBackoff)
+                .add("      tcp.connection-timeout = " + akkaTCPTimeout)
                 .add("    }")
-                .add("    enabled-transports = 
[\"akka.remote.classic.netty.tcp\"]")
-                .add("    netty {")
-                .add("      tcp {")
-                .add("        transport-class = 
\"akka.remote.transport.netty.NettyTransport\"")
-                .add("        port = " + externalPort)
-                .add("        bind-port = " + port)
-                .add("        connection-timeout = " + akkaTCPTimeout)
-                .add("        maximum-frame-size = " + akkaFramesize)
-                .add("        tcp-nodelay = on")
-                .add("        client-socket-worker-pool {")
-                .add("          pool-size-min = " + 
clientSocketWorkerPoolPoolSizeMin)
-                .add("          pool-size-max = " + 
clientSocketWorkerPoolPoolSizeMax)
-                .add("          pool-size-factor = " + 
clientSocketWorkerPoolPoolSizeFactor)
-                .add("        }")
-                .add("        server-socket-worker-pool {")
-                .add("          pool-size-min = " + 
serverSocketWorkerPoolPoolSizeMin)
-                .add("          pool-size-max = " + 
serverSocketWorkerPoolPoolSizeMax)
-                .add("          pool-size-factor = " + 
serverSocketWorkerPoolPoolSizeFactor)
-                .add("        }")
-                .add("      }")
-                .add("    }")
-                .add("    log-remote-lifecycle-events = " + logLifecycleEvents)

Review Comment:
   These options seem to be doing something different than the lifecycle event 
one.
   
   We may have to manually subscribe to and log these events as shown in 
https://github.com/akka/akka/issues/28003#issuecomment-560349926 to replicate 
this option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to