mxm commented on code in PR #782:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/782#discussion_r1500530308
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -277,6 +286,32 @@ private static MemorySize adjustNetworkMemory(
return new MemorySize(memBudget.budget(maxNetworkMemory));
}
+ /**
+ * Calculate how many network segment current vertex needs.
+ *
+ * @param currentVertexParallelism The parallelism of current vertex.
+ * @param otherVertexParallelism The parallelism of other vertex.
+ */
+ @VisibleForTesting
+ static int calculateNetworkSegmentNumber(
+ int currentVertexParallelism,
+ int otherVertexParallelism,
+ String shipStrategy,
+ int buffersPerChannel,
+ int floatingBuffers) {
+ // TODO When the parallelism is changed via the rescale api, the
FORWARD may be changed to
+ // RESCALE. This logic may needs to be updated after FLINK-33123.
+ if (currentVertexParallelism == otherVertexParallelism &&
"FORWARD".equals(shipStrategy)) {
+ return buffersPerChannel + floatingBuffers;
+ } else if ("FORWARD".equals(shipStrategy) ||
"RESCALE".equals(shipStrategy)) {
+ final int channelCount =
+ (int) Math.ceil(1.0d * otherVertexParallelism /
currentVertexParallelism);
Review Comment:
I prefer the following to avoid adding another constant.
```suggestion
(int) Math.ceil(otherVertexParallelism / (double)
currentVertexParallelism);
```
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -243,28 +244,36 @@ private static MemorySize adjustNetworkMemory(
Configuration config,
MemoryBudget memBudget) {
- final long buffersPerChannel =
+ final int buffersPerChannel =
config.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
- final long floatingBuffers =
+ final int floatingBuffers =
config.get(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
final long memorySegmentBytes =
config.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE).getBytes();
long maxNetworkMemory = 0;
for (VertexInfo vertexInfo : jobTopology.getVertexInfos().values()) {
// Add max amount of memory for each input gate
- for (JobVertexID input : vertexInfo.getInputs()) {
- int inputParallelism = updatedParallelisms.get(input);
+ for (Map.Entry<JobVertexID, String> inputEntry :
vertexInfo.getInputs().entrySet()) {
maxNetworkMemory +=
- (inputParallelism * buffersPerChannel +
floatingBuffers)
+ calculateNetworkSegmentNumber(
+
updatedParallelisms.get(vertexInfo.getId()),
+
updatedParallelisms.get(inputEntry.getKey()),
+ inputEntry.getValue(),
Review Comment:
For readability, can we add a variable?
```suggestion
shipStrategy,
```
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -277,6 +286,32 @@ private static MemorySize adjustNetworkMemory(
return new MemorySize(memBudget.budget(maxNetworkMemory));
}
+ /**
+ * Calculate how many network segment current vertex needs.
+ *
+ * @param currentVertexParallelism The parallelism of current vertex.
+ * @param otherVertexParallelism The parallelism of other vertex.
+ */
+ @VisibleForTesting
+ static int calculateNetworkSegmentNumber(
+ int currentVertexParallelism,
+ int otherVertexParallelism,
+ String shipStrategy,
+ int buffersPerChannel,
+ int floatingBuffers) {
+ // TODO When the parallelism is changed via the rescale api, the
FORWARD may be changed to
+ // RESCALE. This logic may needs to be updated after FLINK-33123.
+ if (currentVertexParallelism == otherVertexParallelism &&
"FORWARD".equals(shipStrategy)) {
+ return buffersPerChannel + floatingBuffers;
+ } else if ("FORWARD".equals(shipStrategy) ||
"RESCALE".equals(shipStrategy)) {
Review Comment:
The shipStragegy in the json plan comes from
https://github.com/apache/flink/blob/6c8f3a0799c609d8076f782e5334e389e4d92eee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L1433.
This is a bit concerning because any custom partitioner which has a
`toString()` implementation which returns FORWARD, would run into this code
branch. Although unlikely, that would break the network memory code.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java:
##########
@@ -22,17 +22,19 @@
import lombok.Data;
-import java.util.Set;
+import java.util.Map;
/** Job vertex information. */
@Data
public class VertexInfo {
private final JobVertexID id;
- private final Set<JobVertexID> inputs;
+ // All input vertices and the ship_strategy
+ private final Map<JobVertexID, String> inputs;
- private Set<JobVertexID> outputs;
+ // All output vertices and the ship_strategy
+ private Map<JobVertexID, String> outputs;
Review Comment:
This is a bit of a hack. Fine for now but I would prefer keeping the Set
with a wrapper class containing JobVertexID and a field for the ship_strategy.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -277,6 +286,32 @@ private static MemorySize adjustNetworkMemory(
return new MemorySize(memBudget.budget(maxNetworkMemory));
}
+ /**
+ * Calculate how many network segment current vertex needs.
+ *
+ * @param currentVertexParallelism The parallelism of current vertex.
+ * @param otherVertexParallelism The parallelism of other vertex.
+ */
+ @VisibleForTesting
+ static int calculateNetworkSegmentNumber(
+ int currentVertexParallelism,
+ int otherVertexParallelism,
Review Comment:
```suggestion
int upstreamVertexParallelism,
int downstreamVertexParallelism,
```
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -243,28 +244,36 @@ private static MemorySize adjustNetworkMemory(
Configuration config,
MemoryBudget memBudget) {
- final long buffersPerChannel =
+ final int buffersPerChannel =
config.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
- final long floatingBuffers =
+ final int floatingBuffers =
Review Comment:
The reason this was long is that it's going to be converted to long every
time when multiplying with the memory segment size.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -277,6 +286,32 @@ private static MemorySize adjustNetworkMemory(
return new MemorySize(memBudget.budget(maxNetworkMemory));
}
+ /**
+ * Calculate how many network segment current vertex needs.
+ *
+ * @param currentVertexParallelism The parallelism of current vertex.
+ * @param otherVertexParallelism The parallelism of other vertex.
+ */
+ @VisibleForTesting
+ static int calculateNetworkSegmentNumber(
+ int currentVertexParallelism,
+ int otherVertexParallelism,
Review Comment:
Can we change the logic to use these consistently? I think that makes
reading the code a lot easier. Currently, the parameters change based on the
ship strategy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]