zhuzhurk commented on code in PR #25552:
URL: https://github.com/apache/flink/pull/25552#discussion_r1908594787


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/PointwiseVertexInputInfoComputer.java:
##########
@@ -46,30 +44,34 @@ public class PointwiseVertexInputInfoComputer {
     private static final Logger LOG =
             LoggerFactory.getLogger(PointwiseVertexInputInfoComputer.class);
 
-    private final long dataVolumePerTask;
-
-    public PointwiseVertexInputInfoComputer(long dataVolumePerTask) {
-        this.dataVolumePerTask = dataVolumePerTask;
-    }
+    public PointwiseVertexInputInfoComputer() {}

Review Comment:
   It's fine to just remove this constructor.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/PointwiseVertexInputInfoComputer.java:
##########
@@ -46,30 +44,34 @@ public class PointwiseVertexInputInfoComputer {
     private static final Logger LOG =
             LoggerFactory.getLogger(PointwiseVertexInputInfoComputer.class);
 
-    private final long dataVolumePerTask;
-
-    public PointwiseVertexInputInfoComputer(long dataVolumePerTask) {
-        this.dataVolumePerTask = dataVolumePerTask;
-    }
+    public PointwiseVertexInputInfoComputer() {}
 
     /**
      * Computes the input information for a job vertex based on the provided 
blocking input
      * information and parallelism.
      *
      * @param inputInfos List of blocking input information for the job vertex.
      * @param parallelism Parallelism of the job vertex.
+     * @param dataVolumePerTask Proposed data volume per task for this set of 
inputInfo.
      * @return A map of intermediate data set IDs to their corresponding job 
vertex input
      *     information.
      */
     public Map<IntermediateDataSetID, JobVertexInputInfo> compute(
-            List<BlockingInputInfo> inputInfos, int parallelism) {
-        checkArgument(
-                
inputInfos.stream().noneMatch(BlockingInputInfo::areInterInputsKeysCorrelated));
+            List<BlockingInputInfo> inputInfos, int parallelism, long 
dataVolumePerTask) {
+        long totalDataBytes =
+                
inputInfos.stream().mapToLong(BlockingInputInfo::getNumBytesProduced).sum();
         Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfos = new 
HashMap<>();
         for (BlockingInputInfo inputInfo : inputInfos) {
+            // Currently, we consider all inputs in this method must don't 
have inter-inputs key
+            // correlation. If other possibilities are introduced in the 
future, please add new
+            // branches to this method.
+            checkState(!inputInfo.areInterInputsKeysCorrelated());
             vertexInputInfos.put(
                     inputInfo.getResultId(),
-                    computeVertexInputInfo(inputInfo, parallelism, 
dataVolumePerTask));
+                    computeVertexInputInfo(
+                            inputInfo,
+                            parallelism,
+                            inputInfo.getNumBytesProduced() / totalDataBytes * 
dataVolumePerTask));

Review Comment:
   The result can be wrong. Maybe convert it to double or BigNumber to do the 
calculation?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java:
##########
@@ -500,7 +521,14 @@ static DefaultVertexParallelismAndInputInfosDecider 
createDecider(
                 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM,
                 defaultSourceParallelism);
 
-        return 
DefaultVertexParallelismAndInputInfosDecider.from(maxParallelism, 
configuration);
+        return DefaultVertexParallelismAndInputInfosDecider.from(
+                maxParallelism,
+                
BatchExecutionOptionsInternal.ADAPTIVE_SKEWED_OPTIMIZATION_SKEWED_FACTOR
+                        .defaultValue(),
+                
BatchExecutionOptionsInternal.ADAPTIVE_SKEWED_OPTIMIZATION_SKEWED_THRESHOLD
+                        .defaultValue()
+                        .getBytes(),
+                configuration);

Review Comment:
   Could you introduce a method 
`createDefaultVertexParallelismAndInputInfosDecider(maxParallelism, 
configuration, skewedFactor, skewedThreshold)` to avoid such duplication?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java:
##########
@@ -653,4 +643,13 @@ private static Map<IndexRange, IndexRange> 
reverseIndexRangeMap(
         }
         return reversedRangeMap;
     }
+
+    public static long calculateDataVolumePerTaskForInputsGroup(
+            long globalDataVolumePerTask,
+            List<BlockingInputInfo> inputsGroup,
+            List<BlockingInputInfo> wholeInputs) {
+        return 
inputsGroup.stream().mapToLong(BlockingInputInfo::getNumBytesProduced).sum()

Review Comment:
   Such computing is not correct when using int/long types.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/AllToAllVertexInputInfoComputer.java:
##########
@@ -356,54 +351,124 @@ private static List<IndexRange> 
computePartitionRangesEvenlyData(
         return splitPartitionRange;
     }
 
-    private static long[] computeDataBytesPerSlice(
-            int subpartitionIndex,
-            List<IndexRange> partitionRanges,
-            Map<Integer, long[]> subpartitionBytesByPartitionIndex) {
-        long[] dataBytesPerSlice = new long[partitionRanges.size()];
-        for (int i = 0; i < partitionRanges.size(); ++i) {
-            IndexRange partitionIndexRange = partitionRanges.get(i);
-            dataBytesPerSlice[i] =
-                    IntStream.rangeClosed(
-                                    partitionIndexRange.getStartIndex(),
-                                    partitionIndexRange.getEndIndex())
-                            .mapToLong(
-                                    partitionIndex ->
-                                            subpartitionBytesByPartitionIndex
-                                                    
.get(partitionIndex)[subpartitionIndex])
-                            .sum();
-        }
-        return dataBytesPerSlice;
-    }
-
     private static Map<IntermediateDataSetID, JobVertexInputInfo> 
createJobVertexInputInfos(
+            List<BlockingInputInfo> inputInfos,
             Map<Integer, List<SubpartitionSlice>> subpartitionSlices,
-            List<BlockingInputInfo> nonBroadcastInputInfos,
-            List<BlockingInputInfo> broadcastInputInfos,
             List<IndexRange> subpartitionSliceRanges) {
         final Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfos 
= new HashMap<>();
-        for (BlockingInputInfo inputInfo : nonBroadcastInputInfos) {
-            List<ExecutionVertexInputInfo> executionVertexInputInfos =
-                    createdExecutionVertexInputInfosForNonBroadcast(
-                            inputInfo,
-                            subpartitionSliceRanges,
-                            
subpartitionSlices.get(inputInfo.getInputTypeNumber()));
-            vertexInputInfos.put(
-                    inputInfo.getResultId(), new 
JobVertexInputInfo(executionVertexInputInfos));
+        for (BlockingInputInfo inputInfo : inputInfos) {
+            if (inputInfo.isBroadcast()) {
+                vertexInputInfos.put(
+                        inputInfo.getResultId(),
+                        createdJobVertexInputInfoForBroadcast(
+                                inputInfo, subpartitionSliceRanges.size()));
+            } else {
+                vertexInputInfos.put(
+                        inputInfo.getResultId(),
+                        createdJobVertexInputInfoForNonBroadcast(
+                                inputInfo,
+                                subpartitionSliceRanges,
+                                
subpartitionSlices.get(inputInfo.getInputTypeNumber())));
+            }
         }
+        return vertexInputInfos;
+    }
 
-        for (BlockingInputInfo inputInfo : broadcastInputInfos) {
-            List<ExecutionVertexInputInfo> executionVertexInputInfos =
-                    createdExecutionVertexInputInfosForBroadcast(
-                            inputInfo, subpartitionSliceRanges.size());
+    private Map<IntermediateDataSetID, JobVertexInputInfo>
+            computeJobVertexInputInfosForInputsWithoutInterKeysCorrelation(
+                    List<BlockingInputInfo> inputInfos, int parallelism, long 
dataVolumePerTask) {
+        long totalDataBytes =
+                
inputInfos.stream().mapToLong(BlockingInputInfo::getNumBytesProduced).sum();
+        Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfos = new 
HashMap<>();
+        // For inputs without inter-keys, we should process them one-by-one.
+        for (BlockingInputInfo inputInfo : inputInfos) {
             vertexInputInfos.put(
-                    inputInfo.getResultId(), new 
JobVertexInputInfo(executionVertexInputInfos));
+                    inputInfo.getResultId(),
+                    computeVertexInputInfoForInputWithoutInterKeysCorrelation(
+                            inputInfo,
+                            parallelism,
+                            inputInfo.getNumBytesProduced() / totalDataBytes * 
dataVolumePerTask));

Review Comment:
   The result can be incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to