1996fanrui commented on code in PR #782:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/782#discussion_r1500604384


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -277,6 +286,32 @@ private static MemorySize adjustNetworkMemory(
         return new MemorySize(memBudget.budget(maxNetworkMemory));
     }
 
+    /**
+     * Calculate how many network segment current vertex needs.
+     *
+     * @param currentVertexParallelism The parallelism of current vertex.
+     * @param otherVertexParallelism The parallelism of other vertex.
+     */
+    @VisibleForTesting
+    static int calculateNetworkSegmentNumber(
+            int currentVertexParallelism,
+            int otherVertexParallelism,

Review Comment:
   IIUC, if we using the `upstream` and `downstream`, we still need to know 
which one is currentVertex, which one is otherVertex. For example, rebalance 
type consider the `otherVertexParallelism` as the channelCount.
   
   Please correct me if my understanding is wrong or I miss anything, thank you



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -277,6 +286,32 @@ private static MemorySize adjustNetworkMemory(
         return new MemorySize(memBudget.budget(maxNetworkMemory));
     }
 
+    /**
+     * Calculate how many network segment current vertex needs.
+     *
+     * @param currentVertexParallelism The parallelism of current vertex.
+     * @param otherVertexParallelism The parallelism of other vertex.
+     */
+    @VisibleForTesting
+    static int calculateNetworkSegmentNumber(
+            int currentVertexParallelism,
+            int otherVertexParallelism,
+            String shipStrategy,
+            int buffersPerChannel,
+            int floatingBuffers) {
+        // TODO When the parallelism is changed via the rescale api, the 
FORWARD may be changed to
+        // RESCALE. This logic may needs to be updated after FLINK-33123.
+        if (currentVertexParallelism == otherVertexParallelism && 
"FORWARD".equals(shipStrategy)) {
+            return buffersPerChannel + floatingBuffers;
+        } else if ("FORWARD".equals(shipStrategy) || 
"RESCALE".equals(shipStrategy)) {

Review Comment:
   It comes from 
`org.apache.flink.streaming.runtime.partitioner.StreamPartitioner#toString` 
instead of `org.apache.flink.api.common.functions.Partitioner`.
   
   IIUC, users implement `org.apache.flink.api.common.functions.Partitioner`. 
And `CustomPartitionerWrapper` wraps the `Partitioner`. The 
`CustomPartitionerWrapper#toString` is `CUSTOM`.
   
   I have wrote a demo, and overwrite the toString of custom `Partitioner`. It 
still shows `CUSTOM`.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java:
##########
@@ -22,17 +22,19 @@
 
 import lombok.Data;
 
-import java.util.Set;
+import java.util.Map;
 
 /** Job vertex information. */
 @Data
 public class VertexInfo {
 
     private final JobVertexID id;
 
-    private final Set<JobVertexID> inputs;
+    // All input vertices and the ship_strategy
+    private final Map<JobVertexID, String> inputs;
 
-    private Set<JobVertexID> outputs;
+    // All output vertices and the ship_strategy
+    private Map<JobVertexID, String> outputs;

Review Comment:
   `Set<Class<JobVertexID, String>>` has a semantic difference compared to 
`Map<JobVertexID, String>`. It is possible that there are multiple 
ship_strategies with the same JobVertexID.
   
   I don't have strong opinion for this. I'm opening for it.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -243,28 +244,36 @@ private static MemorySize adjustNetworkMemory(
             Configuration config,
             MemoryBudget memBudget) {
 
-        final long buffersPerChannel =
+        final int buffersPerChannel =
                 
config.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
-        final long floatingBuffers =
+        final int floatingBuffers =

Review Comment:
   Currently, all types in `calculateNetworkSegmentNumber` is int, including: 
`currentVertexParallelism`, `otherVertexParallelism`, `buffersPerChannel` and 
`floatingBuffers`.
   
   If we change `buffersPerChannel` and `floatingBuffers` to  long, and don't 
change `currentVertexParallelism`, `otherVertexParallelism`. It still needs to 
convert int to long.
   
   How about keeping it as is? Inside of calculateNetworkSegmentNumber is int, 
we only need to convert int to long when ` * memorySegmentBytes`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to