xintongsong commented on a change in pull request #18417:
URL: https://github.com/apache/flink/pull/18417#discussion_r791620680



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
##########
@@ -286,6 +286,26 @@
                                     + " by configured memory limits, but some 
resource frameworks like yarn would track this memory usage and kill the 
container once"
                                     + " memory exceeding some threshold. Also 
note that this option is experimental and might be changed future.");
 
+    /**
+     * Whether to reuse tcp connections across multi jobs. If set to true, tcp 
connections will
+     * become a cluster level resource and will not be released after job 
finishes.
+     *
+     * <p>Note: To avoid connection leak, you must set {@link 
#MAX_NUM_TCP_CONNECTIONS} to a smaller
+     * value before you enable tcp connection reuse.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Boolean> TCP_CONNECTION_REUSE_ENABLED =
+            key("taskmanager.network.tcp-connection.enable-reuse")

Review comment:
       Config key should suggest that connection is reused across jobs.
   The same applies to corresponding variable names.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
##########
@@ -222,20 +226,33 @@ public void close(RemoteInputChannel inputChannel) throws 
IOException {
 
         clientHandler.removeInputChannel(inputChannel);
 
-        if (closeReferenceCounter.decrement()) {
-            // Close the TCP connection. Send a close request msg to ensure
-            // that outstanding backwards task events are not discarded.
-            tcpChannel
-                    .writeAndFlush(new NettyMessage.CloseRequest())
-                    .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-
-            // Make sure to remove the client from the factory
-            clientFactory.destroyPartitionRequestClient(connectionId, this);
+        if (closeReferenceCounter.decrement(canDispose())) {

Review comment:
       It's hard to understand why a reference counter needs to understand 
whether the connection can be disposed or not.
   
   IIUC, the conditions required for closing a connection is:
   1. reference counter comes to 0
   2. connection should not be reused (reusing is disabled, or has channel 
error)
   Where 1) is the same as previously and 2) is introduced with this PR.
   
   In that sense, we could make the logic easier to understand with something 
like:
   ```
   if (closeReferenceCounter.decrement() && !canBeReused()) {
       closeConnection();
   } else {
       ....
   }
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
##########
@@ -94,9 +100,9 @@ public boolean isDisposed() {
         }
     }
 
-    public boolean disposeIfNotUsed() {
+    public boolean disposeIfNotUsed(boolean canDispose) {
         synchronized (lock) {
-            if (referenceCount <= disposeOnReferenceCount) {
+            if (referenceCount <= disposeOnReferenceCount && canDispose) {
                 isDisposed = true;
             }

Review comment:
       I don't know what this class is introduced for in the very beginning, 
but now it does not make sense to me any more.
   - The constructor with custom `disposeOnReferenceCount` is only use in a 
test case.
   - Without custom `disposeOnReferenceCount`, this class basically an 
`AtomicInteger`, where `isDisposed` can be replaced by `== 0`.
   - This class introduces a constraint that the reference count can only reach 
the "disposed state" once. This IMO is a design flaw. It should be the caller's 
decision what does the counter reaching zero mean and what action to take at 
the time.
   
   In summary, I think the code will be significantly simplified if we get rid 
of this class and replace it with a simple `AtomicInteger`, or a light-weighted 
wrap with shortcuts like `incAndIsZero`, `decAndIsZero`, `isZero`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -75,6 +78,11 @@
      */
     NettyPartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId)
             throws IOException, InterruptedException {
+        // Proactively close idle connections with error.
+        if (connectionReuseEnabled) {
+            closeErrorChannelConnections();
+        }

Review comment:
       Trying to understand why a factory is responsible for closing the 
errored connections. IIUC, this serves as a safe net. Then:
   - Which component should be responsible for close the connection normally?
   - Which component owns the shared connections, thus should serve as the safe 
net? It's unintuitive the connections are owned by a factory.
   - How should the safe net be triggered? Having to iterate over all exist 
connections whenever creates a new connection does not sounds like a good idea. 
Maybe this can be triggered when the new connection cannot be created due to 
the max connection limit, or somehow asynchronously?

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
##########
@@ -286,6 +286,26 @@
                                     + " by configured memory limits, but some 
resource frameworks like yarn would track this memory usage and kill the 
container once"
                                     + " memory exceeding some threshold. Also 
note that this option is experimental and might be changed future.");
 
+    /**
+     * Whether to reuse tcp connections across multi jobs. If set to true, tcp 
connections will
+     * become a cluster level resource and will not be released after job 
finishes.
+     *
+     * <p>Note: To avoid connection leak, you must set {@link 
#MAX_NUM_TCP_CONNECTIONS} to a smaller
+     * value before you enable tcp connection reuse.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Boolean> TCP_CONNECTION_REUSE_ENABLED =
+            key("taskmanager.network.tcp-connection.enable-reuse")

Review comment:
       Moreover, is the connection reused or shared? Or asked differently, if 
multiple jobs are running at the same time, would they be able to share the 
same connection?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
##########
@@ -108,9 +101,7 @@ public void cancelRequestFor(InputChannelID inputChannelId) 
{
             return;
         }
 
-        if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) {
-            ctx.writeAndFlush(new 
NettyMessage.CancelPartitionRequest(inputChannelId));
-        }

Review comment:
       Should this, and the removal of the field `cancelled`, belong to a 
separate hotfix commit?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to