KarmaGYZ commented on code in PR #22836:
URL: https://github.com/apache/flink/pull/22836#discussion_r1241614117


##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -491,6 +491,36 @@ public class NettyShuffleEnvironmentOptions {
                                     + " based on the platform. Note that the 
\"epoll\" mode can get better performance, less GC and have more advanced 
features which are"
                                     + " only available on modern Linux.");
 
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_IDLE_SECONDS =
+            key("taskmanager.network.netty.client.tcp.keepIdleSec")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Netty client tcp keepalive idle time in 
seconds. "
+                                    + "Note: This doesn't take into account if 
using netty transport type of nio and an older version of jdk 8, "

Review Comment:
   ```suggestion
                                       + "Note: This will not take effect when 
using netty transport type of nio with an older version of JDK 8, "
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java:
##########
@@ -152,6 +163,26 @@ private void initNioBootstrap() {
                 new NioEventLoopGroup(
                         config.getClientNumThreads(), 
NettyServer.getNamedThreadFactory(name));
         bootstrap.group(nioGroup).channel(NioSocketChannel.class);
+
+        config.getTcpKeepIdleInSeconds()
+                .ifPresent(idle -> 
setNioKeepaliveOptions(NIO_TCP_KEEPIDLE_KEY, idle));
+        config.getTcpKeepInternalInSeconds()
+                .ifPresent(interval -> 
setNioKeepaliveOptions(NIO_TCP_KEEPINTERVAL_KEY, interval));
+        config.getTcpKeepCount()
+                .ifPresent(count -> 
setNioKeepaliveOptions(NIO_TCP_KEEPCOUNT_KEY, count));
+    }
+

Review Comment:
   ```suggestion
   @SuppressWarnings("unchecked")
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -491,6 +491,36 @@ public class NettyShuffleEnvironmentOptions {
                                     + " based on the platform. Note that the 
\"epoll\" mode can get better performance, less GC and have more advanced 
features which are"
                                     + " only available on modern Linux.");
 
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_IDLE_SECONDS =
+            key("taskmanager.network.netty.client.tcp.keepIdleSec")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Netty client tcp keepalive idle time in 
seconds. "
+                                    + "Note: This doesn't take into account if 
using netty transport type of nio and an older version of jdk 8, "
+                                    + "refer to 
https://bugs.openjdk.org/browse/JDK-8194298.";);
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_INTERVAL_SECONDS 
=
+            key("taskmanager.network.netty.client.tcp.keepIntervalSec")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Netty client tcp keepalive interval in 
seconds. "
+                                    + "Note: This doesn't take into account if 
using netty transport type of nio and an older version of jdk 8, "
+                                    + "refer to 
https://bugs.openjdk.org/browse/JDK-8194298.";);
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_COUNT =
+            key("taskmanager.network.netty.client.tcp.keepCount")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Netty client tcp keepalive retry counts. "

Review Comment:
   ```suggestion
                               "The maximum number of keepalive probes TCP 
should send before Nettry client dropping the connection. "
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -491,6 +491,36 @@ public class NettyShuffleEnvironmentOptions {
                                     + " based on the platform. Note that the 
\"epoll\" mode can get better performance, less GC and have more advanced 
features which are"
                                     + " only available on modern Linux.");
 
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_IDLE_SECONDS =
+            key("taskmanager.network.netty.client.tcp.keepIdleSec")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Netty client tcp keepalive idle time in 
seconds. "

Review Comment:
   ```suggestion
                               "The time (in seconds) the connection needs to 
remain idle before TCP starts sending keepalive probes. "
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -491,6 +491,36 @@ public class NettyShuffleEnvironmentOptions {
                                     + " based on the platform. Note that the 
\"epoll\" mode can get better performance, less GC and have more advanced 
features which are"
                                     + " only available on modern Linux.");
 
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_IDLE_SECONDS =
+            key("taskmanager.network.netty.client.tcp.keepIdleSec")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Netty client tcp keepalive idle time in 
seconds. "
+                                    + "Note: This doesn't take into account if 
using netty transport type of nio and an older version of jdk 8, "
+                                    + "refer to 
https://bugs.openjdk.org/browse/JDK-8194298.";);
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_INTERVAL_SECONDS 
=
+            key("taskmanager.network.netty.client.tcp.keepIntervalSec")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Netty client tcp keepalive interval in 
seconds. "

Review Comment:
   ```suggestion
                               "The time (in seconds) between individual 
keepalive probes. "
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java:
##########
@@ -18,33 +18,44 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.annotation.VisibleForTesting;
+
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
 import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollChannelOption;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioChannelOption;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
+import jdk.net.ExtendedSocketOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
+import java.net.SocketOption;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
 class NettyClient {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NettyClient.class);
 
+    @VisibleForTesting static final String NIO_TCP_KEEPIDLE_KEY = 
"TCP_KEEPIDLE";
+    @VisibleForTesting static final String NIO_TCP_KEEPINTERVAL_KEY = 
"TCP_KEEPINTERVAL";
+    @VisibleForTesting static final String NIO_TCP_KEEPCOUNT_KEY = 
"TCP_KEEPCOUNT";

Review Comment:
   why do we need the `VisibleForTesting` annotation here? besides, is it 
possible to add UTs for this feature?



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -491,6 +491,36 @@ public class NettyShuffleEnvironmentOptions {
                                     + " based on the platform. Note that the 
\"epoll\" mode can get better performance, less GC and have more advanced 
features which are"
                                     + " only available on modern Linux.");
 
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_IDLE_SECONDS =
+            key("taskmanager.network.netty.client.tcp.keepIdleSec")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Netty client tcp keepalive idle time in 
seconds. "
+                                    + "Note: This doesn't take into account if 
using netty transport type of nio and an older version of jdk 8, "

Review Comment:
   Same as below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to