TanYuxin-tyx commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1065924786
##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -119,6 +142,95 @@ public static int computeNetworkBuffersForAnnouncing(
return requirementForInputs + requirementForOutputs;
}
+ public static int maxRequiredBuffersPerGate(
+ ResultPartitionType partitionType,
+ boolean isGateRequiredMaxBuffersConfigured,
+ int requiredMaxBuffersPerGate) {
+ int requiredMaxBuffers;
+ if (isGateRequiredMaxBuffersConfigured) {
+ requiredMaxBuffers = requiredMaxBuffersPerGate;
+ } else {
+ requiredMaxBuffers =
+
partitionType.isBlockingOrBlockingPersistentResultPartition()
+ ? DEFAULT_MAX_BUFFERS_PER_GATE_FOR_BLOCKING
+ : DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAM;
+ }
+ checkState(requiredMaxBuffers >= 0, "Max required buffers per gate
must be non-negative.");
+ return requiredMaxBuffers;
+ }
+
+ public static int getMaxFloatingBuffersInGate(
+ int numInputChannels, int networkBuffersPerChannel, int
floatingNetworkBuffersPerGate) {
+ return getExclusiveBuffersInGate(numInputChannels,
networkBuffersPerChannel)
+ + floatingNetworkBuffersPerGate;
+ }
+
+ private static int getNumBuffersToAnnounceForInputGates(
+ ResultPartitionType type,
+ int numBuffersPerChannel,
+ int numFloatingBuffersPerGate,
+ boolean isGateRequiredMaxBuffersConfigured,
+ int requiredMaxBuffersPerGate,
+ int numInputChannels) {
+ int maxGateBuffersThreshold =
+ maxRequiredBuffersPerGate(
+ type, isGateRequiredMaxBuffersConfigured,
requiredMaxBuffersPerGate);
+
+ int adjustedBuffersPerChannel =
+ adjustExclusiveBuffersPerChannel(
+ numBuffersPerChannel, numInputChannels,
maxGateBuffersThreshold);
+ boolean useFloatingBuffer = adjustedBuffersPerChannel < 0;
+
+ int maxFloatingBuffers =
+ useFloatingBuffer
+ ? getMaxFloatingBuffersInGate(
+ numInputChannels, numBuffersPerChannel,
numFloatingBuffersPerGate)
+ : numFloatingBuffersPerGate;
+ int exclusiveBuffersPerChannel = useFloatingBuffer ? 0 :
adjustedBuffersPerChannel;
+
+ return exclusiveBuffersPerChannel * numInputChannels +
maxFloatingBuffers;
+ }
+
+ /**
+ * Adjusting the exclusive network buffers based on whether the total
exclusive buffers in one
+ * gate has exceeded the gate buffer threshold.
+ *
+ * @return Adjusted buffers or -1. Return -1 if and only if the total
exclusive buffers will
+ * exceed the gate buffer threshold though the exclusive network
buffers per channel is 1.
+ * Returning -1 indicates all reading buffers in gate should use
floating buffers.
+ */
+ public static int adjustExclusiveBuffersPerChannel(
+ int numBuffersPerChannel, int numInputChannels, int
maxGateBuffersThreshold) {
+ if (numBuffersPerChannel == 0) {
+ return 0;
+ }
+
+ int adjustedBuffersPerChannel = -1;
+ for (int i = numBuffersPerChannel; i > 0; i--) {
+ if (getExclusiveBuffersInGate(numInputChannels, i) <=
maxGateBuffersThreshold) {
+ adjustedBuffersPerChannel = i;
+ break;
+ }
+ }
+ return adjustedBuffersPerChannel;
+ }
Review Comment:
Ok, Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]