zhuzhurk commented on code in PR #21695:
URL: https://github.com/apache/flink/pull/21695#discussion_r1082900221
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java:
##########
@@ -39,6 +40,16 @@ public interface BlockingResultInfo extends
IntermediateResultInfo {
*/
long getNumBytesProduced();
+ /**
+ * Return the aggregated num of bytes according to the index range for
partition and
+ * subpartition.
+ *
+ * @param partitionIndexRange range of the index of the consumed partition.
+ * @param subpartitionIndexRange range of the index of the consumed
subpartition.
+ * @return aggregated input bytes according to the index ranges.
Review Comment:
aggregated input bytes -> aggregated bytes
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -168,21 +168,34 @@ private double getFinishedRatio(final ExecutionJobVertex
executionJobVertex) {
return (double) finishedCount /
executionJobVertex.getTaskVertices().length;
}
- private long getBaseline(
+ private ExecutionTimeWithInputNumBytes getBaseline(
final ExecutionJobVertex executionJobVertex, final long
currentTimeMillis) {
- final long executionTimeMedian =
+ final ExecutionTimeWithInputNumBytes weightedExecutionTimeMedian =
calculateFinishedTaskExecutionTimeMedian(executionJobVertex,
currentTimeMillis);
- return (long) Math.max(baselineLowerBoundMillis, executionTimeMedian *
baselineMultiplier);
+ long multipliedBaseline =
+ (long) (weightedExecutionTimeMedian.getExecutionTime() *
baselineMultiplier);
+ if (multipliedBaseline < baselineLowerBoundMillis) {
+ double inputNumBytesMultiplier =
+ baselineLowerBoundMillis * 1.0 /
weightedExecutionTimeMedian.getExecutionTime();
+ return new ExecutionTimeWithInputNumBytes(
+ baselineLowerBoundMillis,
+ (long)
+ (weightedExecutionTimeMedian.getInputNumBytes()
+ * inputNumBytesMultiplier));
Review Comment:
The change looks good to me. Could you add a test to check this case?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -263,37 +257,38 @@ public void stop() {
}
}
- /** This class defines the execution time and input number bytes for an
execution. */
+ /** This class defines the execution time and input bytes for an
execution. */
@VisibleForTesting
- static class ExecutionTimeWithInputNumBytes
- implements Comparable<ExecutionTimeWithInputNumBytes> {
+ static class ExecutionTimeWithInputBytes implements
Comparable<ExecutionTimeWithInputBytes> {
private final long executionTime;
- private final long inputNumBytes;
+ private final long inputBytes;
- public ExecutionTimeWithInputNumBytes(long executionTime, long
inputNumBytes) {
+ public ExecutionTimeWithInputBytes(long executionTime, long
inputBytes) {
this.executionTime = executionTime;
- this.inputNumBytes = inputNumBytes;
+ this.inputBytes = inputBytes;
}
public long getExecutionTime() {
return executionTime;
}
- public long getInputNumBytes() {
- return inputNumBytes;
+ public long getInputBytes() {
+ return inputBytes;
}
@Override
- public int compareTo(ExecutionTimeWithInputNumBytes other) {
- // We should guarantee that all sorted elements' inputNumBytes are
all zero or non-zero,
+ public int compareTo(ExecutionTimeWithInputBytes other) {
+ // We should guarantee that all sorted elements' inputBytes are
all UNKNOWN or assigned,
// otherwise it may cause ambiguity.
- if (inputNumBytes == 0 || other.getInputNumBytes() == 0) {
+ if (inputBytes == NUM_BYTES_UNKNOWN || other.getInputBytes() ==
NUM_BYTES_UNKNOWN) {
Review Comment:
It's better to check that if one is UNKNOWN, the other is also UNKNOWN.
Otherwise an exception should be thrown to expose this bug.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]