TanYuxin-tyx commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1065925434
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -293,20 +324,74 @@ protected InputChannel createKnownInputChannel(
connectionManager,
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
- networkBuffersPerChannel,
+ buffersPerChannel,
metrics);
}
}
@VisibleForTesting
static SupplierWithException<BufferPool, IOException>
createBufferPoolFactory(
- BufferPoolFactory bufferPoolFactory, int
floatingNetworkBuffersPerGate) {
- Pair<Integer, Integer> pair =
- NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate(
- floatingNetworkBuffersPerGate);
+ BufferPoolFactory bufferPoolFactory,
+ int minFloatingBuffersPerGate,
+ int maxFloatingBuffersPerGate) {
+ Pair<Integer, Integer> pair = Pair.of(minFloatingBuffersPerGate,
maxFloatingBuffersPerGate);
return () -> bufferPoolFactory.createBufferPool(pair.getLeft(),
pair.getRight());
}
+ /**
+ * Based on whether the used exclusive buffers exceed the threshold,
decide whether all buffers
+ * in the gate use Floating Buffers.
+ *
+ * <p>The threshold is configured by {@link
+ *
NettyShuffleEnvironmentOptions#NETWORK_READ_BUFFERS_REQUIRED_PER_GATE_MAX}. If
the option is
+ * not configured, the threshold for Blocking jobs is {@link
+ * NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BLOCKING} and the
threshold for Streaming
+ * jobs is {#link
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+ */
+ protected class GateBuffersNumCalculator {
+
+ private final int minFloatingBuffers;
+
+ private final int maxFloatingBuffers;
+
+ private final int exclusiveBuffersPerChannel;
+
+ GateBuffersNumCalculator(ResultPartitionType partitionType, int
numInputChannels) {
+ int maxGateBuffersThreshold =
+ maxRequiredBuffersPerGate(
+ partitionType,
+ isGateRequiredMaxBuffersConfigured,
+ requiredMaxBuffersPerGate);
+
+ int adjustedBuffersPerChannel =
+ adjustExclusiveBuffersPerChannel(
+ networkBuffersPerChannel, numInputChannels,
maxGateBuffersThreshold);
+ boolean useFloatingBuffer = adjustedBuffersPerChannel < 0;
+
+ this.minFloatingBuffers = useFloatingBuffer ?
maxGateBuffersThreshold : 1;
Review Comment:
Fixed with new calculation logic.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -119,6 +142,95 @@ public static int computeNetworkBuffersForAnnouncing(
return requirementForInputs + requirementForOutputs;
}
+ public static int maxRequiredBuffersPerGate(
+ ResultPartitionType partitionType,
+ boolean isGateRequiredMaxBuffersConfigured,
+ int requiredMaxBuffersPerGate) {
+ int requiredMaxBuffers;
+ if (isGateRequiredMaxBuffersConfigured) {
+ requiredMaxBuffers = requiredMaxBuffersPerGate;
+ } else {
+ requiredMaxBuffers =
+
partitionType.isBlockingOrBlockingPersistentResultPartition()
+ ? DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BLOCKING
+ : DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAM;
+ }
+ checkState(requiredMaxBuffers >= 0, "Max required buffers per gate
must be non-negative.");
+ return requiredMaxBuffers;
+ }
+
+ public static int getMaxFloatingBuffersInGate(
+ int numInputChannels, int networkBuffersPerChannel, int
floatingNetworkBuffersPerGate) {
+ return getExclusiveBuffersInGate(numInputChannels,
networkBuffersPerChannel)
+ + floatingNetworkBuffersPerGate;
+ }
Review Comment:
Fixed with new calculation logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]