xintongsong commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1065498134


##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -119,6 +142,95 @@ public static int computeNetworkBuffersForAnnouncing(
         return requirementForInputs + requirementForOutputs;
     }
 
+    public static int maxRequiredBuffersPerGate(
+            ResultPartitionType partitionType,
+            boolean isGateRequiredMaxBuffersConfigured,
+            int requiredMaxBuffersPerGate) {
+        int requiredMaxBuffers;
+        if (isGateRequiredMaxBuffersConfigured) {
+            requiredMaxBuffers = requiredMaxBuffersPerGate;
+        } else {
+            requiredMaxBuffers =
+                    
partitionType.isBlockingOrBlockingPersistentResultPartition()
+                            ? DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BLOCKING
+                            : DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAM;

Review Comment:
   What about hybrid shuffle? Hybrid shuffle is also used for batch workloads, 
which according to the config option description should use the batch 
threashold.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -213,6 +214,7 @@ public class NettyShuffleEnvironmentOptions {
      * Number of extra network buffers to use for each outgoing/incoming gate 
(result
      * partition/input gate).
      */
+    @Deprecated

Review Comment:
   I think what we decided in the FLIP is not to deprecate these two options 
immediately.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -293,20 +324,74 @@ protected InputChannel createKnownInputChannel(
                     connectionManager,
                     partitionRequestInitialBackoff,
                     partitionRequestMaxBackoff,
-                    networkBuffersPerChannel,
+                    buffersPerChannel,
                     metrics);
         }
     }
 
     @VisibleForTesting
     static SupplierWithException<BufferPool, IOException> 
createBufferPoolFactory(
-            BufferPoolFactory bufferPoolFactory, int 
floatingNetworkBuffersPerGate) {
-        Pair<Integer, Integer> pair =
-                NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate(
-                        floatingNetworkBuffersPerGate);
+            BufferPoolFactory bufferPoolFactory,
+            int minFloatingBuffersPerGate,
+            int maxFloatingBuffersPerGate) {
+        Pair<Integer, Integer> pair = Pair.of(minFloatingBuffersPerGate, 
maxFloatingBuffersPerGate);
         return () -> bufferPoolFactory.createBufferPool(pair.getLeft(), 
pair.getRight());
     }
 
+    /**
+     * Based on whether the used exclusive buffers exceed the threshold, 
decide whether all buffers
+     * in the gate use Floating Buffers.
+     *
+     * <p>The threshold is configured by {@link
+     * 
NettyShuffleEnvironmentOptions#NETWORK_READ_BUFFERS_REQUIRED_PER_GATE_MAX}. If 
the option is
+     * not configured, the threshold for Blocking jobs is {@link
+     * NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BLOCKING} and the 
threshold for Streaming
+     * jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+     */
+    protected class GateBuffersNumCalculator {
+
+        private final int minFloatingBuffers;
+
+        private final int maxFloatingBuffers;
+
+        private final int exclusiveBuffersPerChannel;
+
+        GateBuffersNumCalculator(ResultPartitionType partitionType, int 
numInputChannels) {
+            int maxGateBuffersThreshold =
+                    maxRequiredBuffersPerGate(
+                            partitionType,
+                            isGateRequiredMaxBuffersConfigured,
+                            requiredMaxBuffersPerGate);
+
+            int adjustedBuffersPerChannel =
+                    adjustExclusiveBuffersPerChannel(
+                            networkBuffersPerChannel, numInputChannels, 
maxGateBuffersThreshold);
+            boolean useFloatingBuffer = adjustedBuffersPerChannel < 0;
+
+            this.minFloatingBuffers = useFloatingBuffer ? 
maxGateBuffersThreshold : 1;

Review Comment:
   This is incorrect.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -119,6 +142,95 @@ public static int computeNetworkBuffersForAnnouncing(
         return requirementForInputs + requirementForOutputs;
     }
 
+    public static int maxRequiredBuffersPerGate(
+            ResultPartitionType partitionType,
+            boolean isGateRequiredMaxBuffersConfigured,
+            int requiredMaxBuffersPerGate) {
+        int requiredMaxBuffers;
+        if (isGateRequiredMaxBuffersConfigured) {
+            requiredMaxBuffers = requiredMaxBuffersPerGate;
+        } else {
+            requiredMaxBuffers =
+                    
partitionType.isBlockingOrBlockingPersistentResultPartition()
+                            ? DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BLOCKING
+                            : DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAM;
+        }
+        checkState(requiredMaxBuffers >= 0, "Max required buffers per gate 
must be non-negative.");
+        return requiredMaxBuffers;
+    }
+
+    public static int getMaxFloatingBuffersInGate(
+            int numInputChannels, int networkBuffersPerChannel, int 
floatingNetworkBuffersPerGate) {
+        return getExclusiveBuffersInGate(numInputChannels, 
networkBuffersPerChannel)
+                + floatingNetworkBuffersPerGate;
+    }
+
+    private static int getNumBuffersToAnnounceForInputGates(
+            ResultPartitionType type,
+            int numBuffersPerChannel,
+            int numFloatingBuffersPerGate,
+            boolean isGateRequiredMaxBuffersConfigured,
+            int requiredMaxBuffersPerGate,
+            int numInputChannels) {
+        int maxGateBuffersThreshold =
+                maxRequiredBuffersPerGate(
+                        type, isGateRequiredMaxBuffersConfigured, 
requiredMaxBuffersPerGate);
+
+        int adjustedBuffersPerChannel =
+                adjustExclusiveBuffersPerChannel(
+                        numBuffersPerChannel, numInputChannels, 
maxGateBuffersThreshold);
+        boolean useFloatingBuffer = adjustedBuffersPerChannel < 0;
+
+        int maxFloatingBuffers =
+                useFloatingBuffer
+                        ? getMaxFloatingBuffersInGate(
+                                numInputChannels, numBuffersPerChannel, 
numFloatingBuffersPerGate)
+                        : numFloatingBuffersPerGate;
+        int exclusiveBuffersPerChannel = useFloatingBuffer ? 0 : 
adjustedBuffersPerChannel;
+
+        return exclusiveBuffersPerChannel * numInputChannels + 
maxFloatingBuffers;
+    }
+
+    /**
+     * Adjusting the exclusive network buffers based on whether the total 
exclusive buffers in one
+     * gate has exceeded the gate buffer threshold.
+     *
+     * @return Adjusted buffers or -1. Return -1 if and only if the total 
exclusive buffers will
+     *     exceed the gate buffer threshold though the exclusive network 
buffers per channel is 1.
+     *     Returning -1 indicates all reading buffers in gate should use 
floating buffers.
+     */
+    public static int adjustExclusiveBuffersPerChannel(
+            int numBuffersPerChannel, int numInputChannels, int 
maxGateBuffersThreshold) {
+        if (numBuffersPerChannel == 0) {
+            return 0;
+        }
+
+        int adjustedBuffersPerChannel = -1;
+        for (int i = numBuffersPerChannel; i > 0; i--) {
+            if (getExclusiveBuffersInGate(numInputChannels, i) <= 
maxGateBuffersThreshold) {
+                adjustedBuffersPerChannel = i;
+                break;
+            }
+        }
+        return adjustedBuffersPerChannel;
+    }

Review Comment:
   The whole method can be simplified as:
   ```
   return Math.min(numBuffersPerChannel, maxGateBuffersThreshold / 
numInputChannels);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java:
##########
@@ -282,6 +300,17 @@ public static NettyShuffleEnvironmentConfiguration 
fromConfiguration(
                 configuration.getInteger(
                         
NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
 
+        boolean isRequiredMaxBuffersConfigured =
+                configuration.contains(
+                        
NettyShuffleEnvironmentOptions.NETWORK_READ_BUFFERS_REQUIRED_PER_GATE_MAX);
+
+        int requiredMaxBuffersPerGate =
+                isRequiredMaxBuffersConfigured
+                        ? configuration.getInteger(
+                                NettyShuffleEnvironmentOptions
+                                        
.NETWORK_READ_BUFFERS_REQUIRED_PER_GATE_MAX)
+                        : -1;

Review Comment:
   Use 
`configuration.getOptional(NettyShuffleEnvironmentOptions.NETWORK_READ_BUFFERS_REQUIRED_PER_GATE_MAX)`,
 and you won't need two fields for this.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -119,6 +142,95 @@ public static int computeNetworkBuffersForAnnouncing(
         return requirementForInputs + requirementForOutputs;
     }
 
+    public static int maxRequiredBuffersPerGate(
+            ResultPartitionType partitionType,
+            boolean isGateRequiredMaxBuffersConfigured,
+            int requiredMaxBuffersPerGate) {
+        int requiredMaxBuffers;
+        if (isGateRequiredMaxBuffersConfigured) {
+            requiredMaxBuffers = requiredMaxBuffersPerGate;
+        } else {
+            requiredMaxBuffers =
+                    
partitionType.isBlockingOrBlockingPersistentResultPartition()
+                            ? DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BLOCKING
+                            : DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAM;
+        }
+        checkState(requiredMaxBuffers >= 0, "Max required buffers per gate 
must be non-negative.");
+        return requiredMaxBuffers;
+    }
+
+    public static int getMaxFloatingBuffersInGate(
+            int numInputChannels, int networkBuffersPerChannel, int 
floatingNetworkBuffersPerGate) {
+        return getExclusiveBuffersInGate(numInputChannels, 
networkBuffersPerChannel)
+                + floatingNetworkBuffersPerGate;
+    }

Review Comment:
   Incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to