xintongsong commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1067771354


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -75,7 +84,9 @@ public class SingleInputGateFactory {
 
     @Nonnull protected final NetworkBufferPool networkBufferPool;
 
-    protected final int networkBuffersPerChannel;
+    private final Optional<Integer> requiredMaxBuffersPerGate;

Review Comment:
   ```suggestion
       private final Optional<Integer> maxRequiredBuffersPerGate;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -293,20 +325,79 @@ protected InputChannel createKnownInputChannel(
                     connectionManager,
                     partitionRequestInitialBackoff,
                     partitionRequestMaxBackoff,
-                    networkBuffersPerChannel,
+                    buffersPerChannel,
                     metrics);
         }
     }
 
     @VisibleForTesting
     static SupplierWithException<BufferPool, IOException> 
createBufferPoolFactory(
-            BufferPoolFactory bufferPoolFactory, int 
floatingNetworkBuffersPerGate) {
-        Pair<Integer, Integer> pair =
-                NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate(
-                        floatingNetworkBuffersPerGate);
+            BufferPoolFactory bufferPoolFactory,
+            int minFloatingBuffersPerGate,
+            int maxFloatingBuffersPerGate) {
+        Pair<Integer, Integer> pair = Pair.of(minFloatingBuffersPerGate, 
maxFloatingBuffersPerGate);
         return () -> bufferPoolFactory.createBufferPool(pair.getLeft(), 
pair.getRight());
     }
 
+    /**
+     * Based on whether the used exclusive buffers exceed the threshold, 
decide whether all buffers
+     * in the gate use Floating Buffers.
+     *
+     * <p>The threshold is configured by {@link
+     * 
NettyShuffleEnvironmentOptions#NETWORK_READ_BUFFERS_REQUIRED_PER_GATE_MAX}. If 
the option is
+     * not configured, the threshold for Blocking jobs is {@link
+     * NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for Streaming
+     * jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+     */
+    protected class GateBuffersNumCalculator {
+
+        private final int minFloatingBuffers;
+
+        private final int maxFloatingBuffers;
+
+        private final int actualExclusiveBuffersPerChannel;
+
+        GateBuffersNumCalculator(ResultPartitionType partitionType, int 
numInputChannels) {
+            int maxGateBuffersThreshold =
+                    maxRequiredBuffersPerGate(partitionType, 
requiredMaxBuffersPerGate);
+            int expectMinGateBuffers =
+                    getExpectMinBuffersPerGate(
+                            numInputChannels, 
configuredNetworkBuffersPerChannel);
+            int expectMaxGateBuffers =
+                    getExpectMaxBuffersPerGate(
+                            numInputChannels,
+                            configuredNetworkBuffersPerChannel,
+                            floatingNetworkBuffersPerGate);
+            int minGateBuffers = Math.min(maxGateBuffersThreshold, 
expectMinGateBuffers);
+
+            this.actualExclusiveBuffersPerChannel =
+                    adjustExclusiveBuffersPerChannel(
+                            configuredNetworkBuffersPerChannel,
+                            numInputChannels,
+                            maxGateBuffersThreshold);
+
+            int actualExclusiveBuffersPerGate =
+                    getActualExclusiveBuffersInGate(
+                            numInputChannels, 
actualExclusiveBuffersPerChannel);
+            this.minFloatingBuffers = Math.max(1, minGateBuffers - 
actualExclusiveBuffersPerGate);
+            this.maxFloatingBuffers =
+                    Math.max(1, expectMaxGateBuffers - 
actualExclusiveBuffersPerGate);
+            checkState(minFloatingBuffers <= maxFloatingBuffers);
+        }

Review Comment:
   The calculation logics should not be in the constructor.
   
   We can have a private constructor, and a public static factory method that 
performs the calculations.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -119,6 +142,93 @@ public static int computeNetworkBuffersForAnnouncing(
         return requirementForInputs + requirementForOutputs;
     }
 
+    public static int maxRequiredBuffersPerGate(
+            ResultPartitionType partitionType, Optional<Integer> 
requiredMaxBuffersPerGate) {
+        int requiredMaxBuffers =
+                requiredMaxBuffersPerGate.orElseGet(
+                        () ->
+                                isPipelineResultPartition(partitionType)
+                                        ? 
DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAM
+                                        : 
DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BATCH);
+        checkState(requiredMaxBuffers >= 0, "Max required buffers per gate 
must be non-negative.");
+        return requiredMaxBuffers;
+    }
+
+    private static int getNumBuffersToAnnounceForInputGates(
+            ResultPartitionType type,
+            int configuredNetworkBuffersPerChannel,
+            int floatingNetworkBuffersPerGate,
+            Optional<Integer> requiredMaxBuffersPerGate,
+            int numInputChannels) {
+        int maxGateBuffersThreshold = maxRequiredBuffersPerGate(type, 
requiredMaxBuffersPerGate);
+        int expectMaxGateBuffers =
+                getExpectMaxBuffersPerGate(
+                        numInputChannels,
+                        configuredNetworkBuffersPerChannel,
+                        floatingNetworkBuffersPerGate);
+
+        int actualExclusiveBuffersPerChannel =
+                adjustExclusiveBuffersPerChannel(
+                        configuredNetworkBuffersPerChannel,
+                        numInputChannels,
+                        maxGateBuffersThreshold);
+        int actualExclusiveBuffersPerGate =
+                getActualExclusiveBuffersInGate(numInputChannels, 
actualExclusiveBuffersPerChannel);
+        int maxFloatingBuffers = Math.max(1, expectMaxGateBuffers - 
actualExclusiveBuffersPerGate);
+
+        return actualExclusiveBuffersPerGate + maxFloatingBuffers;
+    }

Review Comment:
   This seems mostly duplicates the calculation for creating 
`GateBuffersNumCalculator`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -119,6 +142,93 @@ public static int computeNetworkBuffersForAnnouncing(
         return requirementForInputs + requirementForOutputs;
     }
 
+    public static int maxRequiredBuffersPerGate(

Review Comment:
   ```suggestion
       public static int getEffectiveMaxRequiredBuffersPerGate(
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java:
##########
@@ -67,6 +70,8 @@ public class NettyShuffleEnvironmentConfiguration {
      */
     private final int floatingNetworkBuffersPerGate;
 
+    private final Optional<Integer> requiredMaxBuffersPerGate;

Review Comment:
   ```suggestion
       private final Optional<Integer> maxRequiredBuffersPerGate;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -293,20 +325,79 @@ protected InputChannel createKnownInputChannel(
                     connectionManager,
                     partitionRequestInitialBackoff,
                     partitionRequestMaxBackoff,
-                    networkBuffersPerChannel,
+                    buffersPerChannel,
                     metrics);
         }
     }
 
     @VisibleForTesting
     static SupplierWithException<BufferPool, IOException> 
createBufferPoolFactory(
-            BufferPoolFactory bufferPoolFactory, int 
floatingNetworkBuffersPerGate) {
-        Pair<Integer, Integer> pair =
-                NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate(
-                        floatingNetworkBuffersPerGate);
+            BufferPoolFactory bufferPoolFactory,
+            int minFloatingBuffersPerGate,
+            int maxFloatingBuffersPerGate) {
+        Pair<Integer, Integer> pair = Pair.of(minFloatingBuffersPerGate, 
maxFloatingBuffersPerGate);
         return () -> bufferPoolFactory.createBufferPool(pair.getLeft(), 
pair.getRight());
     }
 
+    /**
+     * Based on whether the used exclusive buffers exceed the threshold, 
decide whether all buffers
+     * in the gate use Floating Buffers.
+     *
+     * <p>The threshold is configured by {@link
+     * 
NettyShuffleEnvironmentOptions#NETWORK_READ_BUFFERS_REQUIRED_PER_GATE_MAX}. If 
the option is
+     * not configured, the threshold for Blocking jobs is {@link
+     * NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for Streaming
+     * jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+     */
+    protected class GateBuffersNumCalculator {
+
+        private final int minFloatingBuffers;
+
+        private final int maxFloatingBuffers;
+
+        private final int actualExclusiveBuffersPerChannel;
+
+        GateBuffersNumCalculator(ResultPartitionType partitionType, int 
numInputChannels) {
+            int maxGateBuffersThreshold =
+                    maxRequiredBuffersPerGate(partitionType, 
requiredMaxBuffersPerGate);
+            int expectMinGateBuffers =
+                    getExpectMinBuffersPerGate(
+                            numInputChannels, 
configuredNetworkBuffersPerChannel);
+            int expectMaxGateBuffers =
+                    getExpectMaxBuffersPerGate(
+                            numInputChannels,
+                            configuredNetworkBuffersPerChannel,
+                            floatingNetworkBuffersPerGate);
+            int minGateBuffers = Math.min(maxGateBuffersThreshold, 
expectMinGateBuffers);
+
+            this.actualExclusiveBuffersPerChannel =
+                    adjustExclusiveBuffersPerChannel(
+                            configuredNetworkBuffersPerChannel,
+                            numInputChannels,
+                            maxGateBuffersThreshold);
+
+            int actualExclusiveBuffersPerGate =
+                    getActualExclusiveBuffersInGate(
+                            numInputChannels, 
actualExclusiveBuffersPerChannel);
+            this.minFloatingBuffers = Math.max(1, minGateBuffers - 
actualExclusiveBuffersPerGate);
+            this.maxFloatingBuffers =
+                    Math.max(1, expectMaxGateBuffers - 
actualExclusiveBuffersPerGate);

Review Comment:
   Same here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -293,20 +325,79 @@ protected InputChannel createKnownInputChannel(
                     connectionManager,
                     partitionRequestInitialBackoff,
                     partitionRequestMaxBackoff,
-                    networkBuffersPerChannel,
+                    buffersPerChannel,
                     metrics);
         }
     }
 
     @VisibleForTesting
     static SupplierWithException<BufferPool, IOException> 
createBufferPoolFactory(
-            BufferPoolFactory bufferPoolFactory, int 
floatingNetworkBuffersPerGate) {
-        Pair<Integer, Integer> pair =
-                NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate(
-                        floatingNetworkBuffersPerGate);
+            BufferPoolFactory bufferPoolFactory,
+            int minFloatingBuffersPerGate,
+            int maxFloatingBuffersPerGate) {
+        Pair<Integer, Integer> pair = Pair.of(minFloatingBuffersPerGate, 
maxFloatingBuffersPerGate);
         return () -> bufferPoolFactory.createBufferPool(pair.getLeft(), 
pair.getRight());
     }
 
+    /**
+     * Based on whether the used exclusive buffers exceed the threshold, 
decide whether all buffers
+     * in the gate use Floating Buffers.
+     *
+     * <p>The threshold is configured by {@link
+     * 
NettyShuffleEnvironmentOptions#NETWORK_READ_BUFFERS_REQUIRED_PER_GATE_MAX}. If 
the option is
+     * not configured, the threshold for Blocking jobs is {@link
+     * NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for Streaming
+     * jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+     */
+    protected class GateBuffersNumCalculator {

Review Comment:
   ```suggestion
       protected class GateBuffersSpec {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -119,6 +142,93 @@ public static int computeNetworkBuffersForAnnouncing(
         return requirementForInputs + requirementForOutputs;
     }
 
+    public static int maxRequiredBuffersPerGate(
+            ResultPartitionType partitionType, Optional<Integer> 
requiredMaxBuffersPerGate) {
+        int requiredMaxBuffers =
+                requiredMaxBuffersPerGate.orElseGet(
+                        () ->
+                                isPipelineResultPartition(partitionType)
+                                        ? 
DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAM
+                                        : 
DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BATCH);
+        checkState(requiredMaxBuffers >= 0, "Max required buffers per gate 
must be non-negative.");
+        return requiredMaxBuffers;
+    }
+
+    private static int getNumBuffersToAnnounceForInputGates(
+            ResultPartitionType type,
+            int configuredNetworkBuffersPerChannel,
+            int floatingNetworkBuffersPerGate,
+            Optional<Integer> requiredMaxBuffersPerGate,
+            int numInputChannels) {
+        int maxGateBuffersThreshold = maxRequiredBuffersPerGate(type, 
requiredMaxBuffersPerGate);
+        int expectMaxGateBuffers =
+                getExpectMaxBuffersPerGate(
+                        numInputChannels,
+                        configuredNetworkBuffersPerChannel,
+                        floatingNetworkBuffersPerGate);
+
+        int actualExclusiveBuffersPerChannel =
+                adjustExclusiveBuffersPerChannel(
+                        configuredNetworkBuffersPerChannel,
+                        numInputChannels,
+                        maxGateBuffersThreshold);
+        int actualExclusiveBuffersPerGate =
+                getActualExclusiveBuffersInGate(numInputChannels, 
actualExclusiveBuffersPerChannel);
+        int maxFloatingBuffers = Math.max(1, expectMaxGateBuffers - 
actualExclusiveBuffersPerGate);
+
+        return actualExclusiveBuffersPerGate + maxFloatingBuffers;
+    }
+
+    public static boolean isPipelineResultPartition(ResultPartitionType 
partitionType) {
+        return partitionType.isPipelinedOrPipelinedBoundedResultPartition()
+                || partitionType == ResultPartitionType.PIPELINED_APPROXIMATE;
+    }
+
+    /**
+     * Adjusting the exclusive network buffers based on whether the total 
exclusive buffers in one
+     * gate has exceeded the gate buffer threshold.
+     *
+     * @return Adjusted buffers or -1. Return -1 if and only if the total 
exclusive buffers will
+     *     exceed the gate buffer threshold though the exclusive network 
buffers per channel is 1.
+     *     Returning -1 indicates all reading buffers in gate should use 
floating buffers.
+     */
+    public static int adjustExclusiveBuffersPerChannel(
+            int numBuffersPerChannel, int numInputChannels, int 
maxGateBuffersThreshold) {
+        checkArgument(numInputChannels > 0, "Must be positive.");
+        return Math.min(numBuffersPerChannel, maxGateBuffersThreshold / 
numInputChannels);
+    }
+
+    public static int getExpectMinBuffersPerGate(
+            int numInputChannels, int configuredNetworkBuffersPerChannel) {
+        return numInputChannels * configuredNetworkBuffersPerChannel + 1;
+    }
+
+    public static int getExpectMaxBuffersPerGate(
+            int numInputChannels,
+            int configuredNetworkBuffersPerChannel,
+            int configuredFloatingBuffersPerGate) {
+        return Math.max(
+                getExpectMinBuffersPerGate(numInputChannels, 
configuredNetworkBuffersPerChannel),
+                numInputChannels * configuredNetworkBuffersPerChannel
+                        + configuredFloatingBuffersPerGate);

Review Comment:
   ```suggestion
           return numInputChannels * configuredNetworkBuffersPerChannel
                   + Math.max(1, configuredFloatingBuffersPerGate);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -119,6 +142,93 @@ public static int computeNetworkBuffersForAnnouncing(
         return requirementForInputs + requirementForOutputs;
     }
 
+    public static int maxRequiredBuffersPerGate(
+            ResultPartitionType partitionType, Optional<Integer> 
requiredMaxBuffersPerGate) {
+        int requiredMaxBuffers =

Review Comment:
   ```suggestion
           int maxRequiredBuffers =
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -119,6 +142,93 @@ public static int computeNetworkBuffersForAnnouncing(
         return requirementForInputs + requirementForOutputs;
     }
 
+    public static int maxRequiredBuffersPerGate(
+            ResultPartitionType partitionType, Optional<Integer> 
requiredMaxBuffersPerGate) {
+        int requiredMaxBuffers =
+                requiredMaxBuffersPerGate.orElseGet(
+                        () ->
+                                isPipelineResultPartition(partitionType)
+                                        ? 
DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAM
+                                        : 
DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BATCH);
+        checkState(requiredMaxBuffers >= 0, "Max required buffers per gate 
must be non-negative.");
+        return requiredMaxBuffers;
+    }
+
+    private static int getNumBuffersToAnnounceForInputGates(
+            ResultPartitionType type,
+            int configuredNetworkBuffersPerChannel,
+            int floatingNetworkBuffersPerGate,
+            Optional<Integer> requiredMaxBuffersPerGate,
+            int numInputChannels) {
+        int maxGateBuffersThreshold = maxRequiredBuffersPerGate(type, 
requiredMaxBuffersPerGate);
+        int expectMaxGateBuffers =
+                getExpectMaxBuffersPerGate(
+                        numInputChannels,
+                        configuredNetworkBuffersPerChannel,
+                        floatingNetworkBuffersPerGate);
+
+        int actualExclusiveBuffersPerChannel =
+                adjustExclusiveBuffersPerChannel(
+                        configuredNetworkBuffersPerChannel,
+                        numInputChannels,
+                        maxGateBuffersThreshold);
+        int actualExclusiveBuffersPerGate =
+                getActualExclusiveBuffersInGate(numInputChannels, 
actualExclusiveBuffersPerChannel);
+        int maxFloatingBuffers = Math.max(1, expectMaxGateBuffers - 
actualExclusiveBuffersPerGate);
+
+        return actualExclusiveBuffersPerGate + maxFloatingBuffers;
+    }
+
+    public static boolean isPipelineResultPartition(ResultPartitionType 
partitionType) {
+        return partitionType.isPipelinedOrPipelinedBoundedResultPartition()
+                || partitionType == ResultPartitionType.PIPELINED_APPROXIMATE;
+    }
+
+    /**
+     * Adjusting the exclusive network buffers based on whether the total 
exclusive buffers in one
+     * gate has exceeded the gate buffer threshold.
+     *
+     * @return Adjusted buffers or -1. Return -1 if and only if the total 
exclusive buffers will
+     *     exceed the gate buffer threshold though the exclusive network 
buffers per channel is 1.
+     *     Returning -1 indicates all reading buffers in gate should use 
floating buffers.
+     */
+    public static int adjustExclusiveBuffersPerChannel(
+            int numBuffersPerChannel, int numInputChannels, int 
maxGateBuffersThreshold) {
+        checkArgument(numInputChannels > 0, "Must be positive.");
+        return Math.min(numBuffersPerChannel, maxGateBuffersThreshold / 
numInputChannels);

Review Comment:
   ```suggestion
           return Math.min(numBuffersPerChannel, (maxGateBuffersThreshold - 1) 
/ numInputChannels);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -293,20 +325,79 @@ protected InputChannel createKnownInputChannel(
                     connectionManager,
                     partitionRequestInitialBackoff,
                     partitionRequestMaxBackoff,
-                    networkBuffersPerChannel,
+                    buffersPerChannel,
                     metrics);
         }
     }
 
     @VisibleForTesting
     static SupplierWithException<BufferPool, IOException> 
createBufferPoolFactory(
-            BufferPoolFactory bufferPoolFactory, int 
floatingNetworkBuffersPerGate) {
-        Pair<Integer, Integer> pair =
-                NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate(
-                        floatingNetworkBuffersPerGate);
+            BufferPoolFactory bufferPoolFactory,
+            int minFloatingBuffersPerGate,
+            int maxFloatingBuffersPerGate) {
+        Pair<Integer, Integer> pair = Pair.of(minFloatingBuffersPerGate, 
maxFloatingBuffersPerGate);
         return () -> bufferPoolFactory.createBufferPool(pair.getLeft(), 
pair.getRight());
     }
 
+    /**
+     * Based on whether the used exclusive buffers exceed the threshold, 
decide whether all buffers
+     * in the gate use Floating Buffers.
+     *
+     * <p>The threshold is configured by {@link
+     * 
NettyShuffleEnvironmentOptions#NETWORK_READ_BUFFERS_REQUIRED_PER_GATE_MAX}. If 
the option is
+     * not configured, the threshold for Blocking jobs is {@link
+     * NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for Streaming
+     * jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+     */
+    protected class GateBuffersNumCalculator {
+
+        private final int minFloatingBuffers;
+
+        private final int maxFloatingBuffers;
+
+        private final int actualExclusiveBuffersPerChannel;
+
+        GateBuffersNumCalculator(ResultPartitionType partitionType, int 
numInputChannels) {
+            int maxGateBuffersThreshold =
+                    maxRequiredBuffersPerGate(partitionType, 
requiredMaxBuffersPerGate);
+            int expectMinGateBuffers =
+                    getExpectMinBuffersPerGate(
+                            numInputChannels, 
configuredNetworkBuffersPerChannel);
+            int expectMaxGateBuffers =
+                    getExpectMaxBuffersPerGate(
+                            numInputChannels,
+                            configuredNetworkBuffersPerChannel,
+                            floatingNetworkBuffersPerGate);
+            int minGateBuffers = Math.min(maxGateBuffersThreshold, 
expectMinGateBuffers);
+
+            this.actualExclusiveBuffersPerChannel =
+                    adjustExclusiveBuffersPerChannel(
+                            configuredNetworkBuffersPerChannel,
+                            numInputChannels,
+                            maxGateBuffersThreshold);

Review Comment:
   Might be better to use `minGateBuffers` here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -293,20 +325,79 @@ protected InputChannel createKnownInputChannel(
                     connectionManager,
                     partitionRequestInitialBackoff,
                     partitionRequestMaxBackoff,
-                    networkBuffersPerChannel,
+                    buffersPerChannel,
                     metrics);
         }
     }
 
     @VisibleForTesting
     static SupplierWithException<BufferPool, IOException> 
createBufferPoolFactory(
-            BufferPoolFactory bufferPoolFactory, int 
floatingNetworkBuffersPerGate) {
-        Pair<Integer, Integer> pair =
-                NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate(
-                        floatingNetworkBuffersPerGate);
+            BufferPoolFactory bufferPoolFactory,
+            int minFloatingBuffersPerGate,
+            int maxFloatingBuffersPerGate) {
+        Pair<Integer, Integer> pair = Pair.of(minFloatingBuffersPerGate, 
maxFloatingBuffersPerGate);
         return () -> bufferPoolFactory.createBufferPool(pair.getLeft(), 
pair.getRight());
     }
 
+    /**
+     * Based on whether the used exclusive buffers exceed the threshold, 
decide whether all buffers
+     * in the gate use Floating Buffers.
+     *
+     * <p>The threshold is configured by {@link
+     * 
NettyShuffleEnvironmentOptions#NETWORK_READ_BUFFERS_REQUIRED_PER_GATE_MAX}. If 
the option is
+     * not configured, the threshold for Blocking jobs is {@link
+     * NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for Streaming
+     * jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+     */
+    protected class GateBuffersNumCalculator {
+
+        private final int minFloatingBuffers;
+
+        private final int maxFloatingBuffers;
+
+        private final int actualExclusiveBuffersPerChannel;
+
+        GateBuffersNumCalculator(ResultPartitionType partitionType, int 
numInputChannels) {
+            int maxGateBuffersThreshold =
+                    maxRequiredBuffersPerGate(partitionType, 
requiredMaxBuffersPerGate);
+            int expectMinGateBuffers =
+                    getExpectMinBuffersPerGate(
+                            numInputChannels, 
configuredNetworkBuffersPerChannel);
+            int expectMaxGateBuffers =
+                    getExpectMaxBuffersPerGate(
+                            numInputChannels,
+                            configuredNetworkBuffersPerChannel,
+                            floatingNetworkBuffersPerGate);
+            int minGateBuffers = Math.min(maxGateBuffersThreshold, 
expectMinGateBuffers);
+
+            this.actualExclusiveBuffersPerChannel =
+                    adjustExclusiveBuffersPerChannel(
+                            configuredNetworkBuffersPerChannel,
+                            numInputChannels,
+                            maxGateBuffersThreshold);
+
+            int actualExclusiveBuffersPerGate =
+                    getActualExclusiveBuffersInGate(
+                            numInputChannels, 
actualExclusiveBuffersPerChannel);
+            this.minFloatingBuffers = Math.max(1, minGateBuffers - 
actualExclusiveBuffersPerGate);

Review Comment:
   We should not allow `minGateBuffers - actualExclusiveBuffersPerGate < 1`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -35,6 +38,10 @@
  */
 public class NettyShuffleUtils {
 
+    public static final int DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BATCH = 1000;
+
+    public static final int DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAM = 
Integer.MAX_VALUE;

Review Comment:
   ```suggestion
       public static final int DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH 
= 1000;
   
       public static final int DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_STREAM 
= Integer.MAX_VALUE;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -86,25 +93,41 @@ public static Pair<Integer, Integer> 
getMinMaxNetworkBuffersPerResultPartition(
     public static int computeNetworkBuffersForAnnouncing(
             final int numBuffersPerChannel,
             final int numFloatingBuffersPerGate,
+            final Optional<Integer> requiredMaxBuffersPerGate,
             final int sortShuffleMinParallelism,
             final int sortShuffleMinBuffers,
-            final int numTotalInputChannels,
             final int numTotalInputGates,
+            final Map<IntermediateDataSetID, Integer> inputChannelNums,
             final Map<IntermediateDataSetID, Integer> subpartitionNums,
+            final Map<IntermediateDataSetID, ResultPartitionType> 
inputPartitionTypes,
             final Map<IntermediateDataSetID, ResultPartitionType> 
partitionTypes) {
 
-        // Each input channel will retain N exclusive network buffers, N = 
numBuffersPerChannel.
-        // Each input gate is guaranteed to have a number of floating buffers.
-        int requirementForInputs =
-                getNetworkBuffersPerInputChannel(numBuffersPerChannel) * 
numTotalInputChannels
-                        + 
getMinMaxFloatingBuffersPerInputGate(numFloatingBuffersPerGate).getRight()
-                                * numTotalInputGates;
+        int requirementForInputs = 0;
+        int maxSingleGateBuffers = 0;
+        for (IntermediateDataSetID dataSetId : inputChannelNums.keySet()) {
+            int numChannels = inputChannelNums.get(dataSetId);
+            ResultPartitionType inputPartitionType = 
inputPartitionTypes.get(dataSetId);
+            checkNotNull(inputPartitionType);
+
+            int numSingleGateBuffers =
+                    getNumBuffersToAnnounceForInputGates(
+                            inputPartitionType,
+                            numBuffersPerChannel,
+                            numFloatingBuffersPerGate,
+                            requiredMaxBuffersPerGate,
+                            numChannels);
+            requirementForInputs += numSingleGateBuffers;
+            maxSingleGateBuffers = Math.max(maxSingleGateBuffers, 
numSingleGateBuffers);
+        }
+        requirementForInputs +=
+                getReusePartitionInputBuffers(
+                        numTotalInputGates, inputChannelNums, 
maxSingleGateBuffers);

Review Comment:
   What's this for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to