zhuzhurk commented on code in PR #25552:
URL: https://github.com/apache/flink/pull/25552#discussion_r1908594787
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/PointwiseVertexInputInfoComputer.java:
##########
@@ -46,30 +44,34 @@ public class PointwiseVertexInputInfoComputer {
private static final Logger LOG =
LoggerFactory.getLogger(PointwiseVertexInputInfoComputer.class);
- private final long dataVolumePerTask;
-
- public PointwiseVertexInputInfoComputer(long dataVolumePerTask) {
- this.dataVolumePerTask = dataVolumePerTask;
- }
+ public PointwiseVertexInputInfoComputer() {}
Review Comment:
It's fine to just remove this constructor.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/PointwiseVertexInputInfoComputer.java:
##########
@@ -46,30 +44,34 @@ public class PointwiseVertexInputInfoComputer {
private static final Logger LOG =
LoggerFactory.getLogger(PointwiseVertexInputInfoComputer.class);
- private final long dataVolumePerTask;
-
- public PointwiseVertexInputInfoComputer(long dataVolumePerTask) {
- this.dataVolumePerTask = dataVolumePerTask;
- }
+ public PointwiseVertexInputInfoComputer() {}
/**
* Computes the input information for a job vertex based on the provided
blocking input
* information and parallelism.
*
* @param inputInfos List of blocking input information for the job vertex.
* @param parallelism Parallelism of the job vertex.
+ * @param dataVolumePerTask Proposed data volume per task for this set of
inputInfo.
* @return A map of intermediate data set IDs to their corresponding job
vertex input
* information.
*/
public Map<IntermediateDataSetID, JobVertexInputInfo> compute(
- List<BlockingInputInfo> inputInfos, int parallelism) {
- checkArgument(
-
inputInfos.stream().noneMatch(BlockingInputInfo::areInterInputsKeysCorrelated));
+ List<BlockingInputInfo> inputInfos, int parallelism, long
dataVolumePerTask) {
+ long totalDataBytes =
+
inputInfos.stream().mapToLong(BlockingInputInfo::getNumBytesProduced).sum();
Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfos = new
HashMap<>();
for (BlockingInputInfo inputInfo : inputInfos) {
+ // Currently, we consider all inputs in this method must don't
have inter-inputs key
+ // correlation. If other possibilities are introduced in the
future, please add new
+ // branches to this method.
+ checkState(!inputInfo.areInterInputsKeysCorrelated());
vertexInputInfos.put(
inputInfo.getResultId(),
- computeVertexInputInfo(inputInfo, parallelism,
dataVolumePerTask));
+ computeVertexInputInfo(
+ inputInfo,
+ parallelism,
+ inputInfo.getNumBytesProduced() / totalDataBytes *
dataVolumePerTask));
Review Comment:
The result can be wrong. Maybe convert it to double or BigNumber to do the
calculation?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java:
##########
@@ -500,7 +521,14 @@ static DefaultVertexParallelismAndInputInfosDecider
createDecider(
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM,
defaultSourceParallelism);
- return
DefaultVertexParallelismAndInputInfosDecider.from(maxParallelism,
configuration);
+ return DefaultVertexParallelismAndInputInfosDecider.from(
+ maxParallelism,
+
BatchExecutionOptionsInternal.ADAPTIVE_SKEWED_OPTIMIZATION_SKEWED_FACTOR
+ .defaultValue(),
+
BatchExecutionOptionsInternal.ADAPTIVE_SKEWED_OPTIMIZATION_SKEWED_THRESHOLD
+ .defaultValue()
+ .getBytes(),
+ configuration);
Review Comment:
Could you introduce a method
`createDefaultVertexParallelismAndInputInfosDecider(maxParallelism,
configuration, skewedFactor, skewedThreshold)` to avoid such duplication?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java:
##########
@@ -653,4 +643,13 @@ private static Map<IndexRange, IndexRange>
reverseIndexRangeMap(
}
return reversedRangeMap;
}
+
+ public static long calculateDataVolumePerTaskForInputsGroup(
+ long globalDataVolumePerTask,
+ List<BlockingInputInfo> inputsGroup,
+ List<BlockingInputInfo> wholeInputs) {
+ return
inputsGroup.stream().mapToLong(BlockingInputInfo::getNumBytesProduced).sum()
Review Comment:
Such computing is not correct when using int/long types.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/AllToAllVertexInputInfoComputer.java:
##########
@@ -356,54 +351,124 @@ private static List<IndexRange>
computePartitionRangesEvenlyData(
return splitPartitionRange;
}
- private static long[] computeDataBytesPerSlice(
- int subpartitionIndex,
- List<IndexRange> partitionRanges,
- Map<Integer, long[]> subpartitionBytesByPartitionIndex) {
- long[] dataBytesPerSlice = new long[partitionRanges.size()];
- for (int i = 0; i < partitionRanges.size(); ++i) {
- IndexRange partitionIndexRange = partitionRanges.get(i);
- dataBytesPerSlice[i] =
- IntStream.rangeClosed(
- partitionIndexRange.getStartIndex(),
- partitionIndexRange.getEndIndex())
- .mapToLong(
- partitionIndex ->
- subpartitionBytesByPartitionIndex
-
.get(partitionIndex)[subpartitionIndex])
- .sum();
- }
- return dataBytesPerSlice;
- }
-
private static Map<IntermediateDataSetID, JobVertexInputInfo>
createJobVertexInputInfos(
+ List<BlockingInputInfo> inputInfos,
Map<Integer, List<SubpartitionSlice>> subpartitionSlices,
- List<BlockingInputInfo> nonBroadcastInputInfos,
- List<BlockingInputInfo> broadcastInputInfos,
List<IndexRange> subpartitionSliceRanges) {
final Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfos
= new HashMap<>();
- for (BlockingInputInfo inputInfo : nonBroadcastInputInfos) {
- List<ExecutionVertexInputInfo> executionVertexInputInfos =
- createdExecutionVertexInputInfosForNonBroadcast(
- inputInfo,
- subpartitionSliceRanges,
-
subpartitionSlices.get(inputInfo.getInputTypeNumber()));
- vertexInputInfos.put(
- inputInfo.getResultId(), new
JobVertexInputInfo(executionVertexInputInfos));
+ for (BlockingInputInfo inputInfo : inputInfos) {
+ if (inputInfo.isBroadcast()) {
+ vertexInputInfos.put(
+ inputInfo.getResultId(),
+ createdJobVertexInputInfoForBroadcast(
+ inputInfo, subpartitionSliceRanges.size()));
+ } else {
+ vertexInputInfos.put(
+ inputInfo.getResultId(),
+ createdJobVertexInputInfoForNonBroadcast(
+ inputInfo,
+ subpartitionSliceRanges,
+
subpartitionSlices.get(inputInfo.getInputTypeNumber())));
+ }
}
+ return vertexInputInfos;
+ }
- for (BlockingInputInfo inputInfo : broadcastInputInfos) {
- List<ExecutionVertexInputInfo> executionVertexInputInfos =
- createdExecutionVertexInputInfosForBroadcast(
- inputInfo, subpartitionSliceRanges.size());
+ private Map<IntermediateDataSetID, JobVertexInputInfo>
+ computeJobVertexInputInfosForInputsWithoutInterKeysCorrelation(
+ List<BlockingInputInfo> inputInfos, int parallelism, long
dataVolumePerTask) {
+ long totalDataBytes =
+
inputInfos.stream().mapToLong(BlockingInputInfo::getNumBytesProduced).sum();
+ Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfos = new
HashMap<>();
+ // For inputs without inter-keys, we should process them one-by-one.
+ for (BlockingInputInfo inputInfo : inputInfos) {
vertexInputInfos.put(
- inputInfo.getResultId(), new
JobVertexInputInfo(executionVertexInputInfos));
+ inputInfo.getResultId(),
+ computeVertexInputInfoForInputWithoutInterKeysCorrelation(
+ inputInfo,
+ parallelism,
+ inputInfo.getNumBytesProduced() / totalDataBytes *
dataVolumePerTask));
Review Comment:
The result can be incorrect.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]