[
https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719423#comment-17719423
]
Tan Kim commented on FLINK-31977:
---------------------------------
I understand what you're saying.
However, there's a part of the code that's a bit hard to understand because it
has multiple returns based on conditional statements.
I think I can change it to something more intuitive like this, what do you
think?
This is unrelated to the original suggestion in the jira ticket to improve
inefficient function calls per SCALING_EFFECTIVENESS_DETECTION_ENABLED, but it
does make the code a little easier to understand.
{code:java}
private boolean detectIneffectiveScaleUp(
AbstractFlinkResource<?, ?> resource,
JobVertexID vertex,
Configuration conf,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
ScalingSummary lastSummary) {
double lastProcRate =
lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
double lastExpectedProcRate =
lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent();
var currentProcRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
// To judge the effectiveness of the scale up operation we compute how much
of the expected
// increase actually happened. For example if we expect a 100 increase in
proc rate and only
// got an increase of 10 we only accomplished 10% of the desired increase.
If this number is
// below the threshold, we mark the scaling ineffective.
double expectedIncrease = lastExpectedProcRate - lastProcRate;
double actualIncrease = currentProcRate - lastProcRate;
boolean isInEffectiveScaleUp =
(actualIncrease / expectedIncrease)
<
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
if (isInEffectiveScaleUp) {
var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
EventRecorder.Reason.IneffectiveScaling,
EventRecorder.Component.Operator,
message);
if
(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
LOG.info(
"Ineffective scaling detected for {}, expected increase {},
actual {}",
vertex,
expectedIncrease,
actualIncrease);
return true;
}
}
return false;
} {code}
> If scaling.effectiveness.detection.enabled is false, the call to the
> detectIneffectiveScaleUp() function is unnecessary
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-31977
> URL: https://issues.apache.org/jira/browse/FLINK-31977
> Project: Flink
> Issue Type: Improvement
> Components: Autoscaler
> Affects Versions: 1.17.0
> Reporter: Tan Kim
> Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED
> (scaling.effectiveness.detection.enabled) is true after all the necessary
> computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
> AbstractFlinkResource<?, ?> resource,
> JobVertexID vertex,
> Configuration conf,
> Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
> ScalingSummary lastSummary) {
> double lastProcRate =
> lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); //
> 22569.315633422066
> double lastExpectedProcRate =
>
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); //
> 37340.0
> var currentProcRate =
> evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
> // To judge the effectiveness of the scale up operation we compute how
> much of the expected
> // increase actually happened. For example if we expect a 100 increase in
> proc rate and only
> // got an increase of 10 we only accomplished 10% of the desired
> increase. If this number is
> // below the threshold, we mark the scaling ineffective.
> double expectedIncrease = lastExpectedProcRate - lastProcRate;
> double actualIncrease = currentProcRate - lastProcRate;
> boolean withinEffectiveThreshold =
> (actualIncrease / expectedIncrease)
> >=
> conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
> if (withinEffectiveThreshold) {
> return false;
> }
> var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
> eventRecorder.triggerEvent(
> resource,
> EventRecorder.Type.Normal,
> EventRecorder.Reason.IneffectiveScaling,
> EventRecorder.Component.Operator,
> message);
> if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> LOG.info(message);
> return true;
> } else {
> return false;
> }
> } {code}
> In the call to the detectIneffectiveScaleUp function, I would suggest
> checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.
> {code:java}
> JobVertexScaler.java #150
> if (currentParallelism == lastSummary.getNewParallelism() &&
> lastSummary.isScaledUp()) {
> if (scaledUp) {
>
> if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> return detectIneffectiveScaleUp(resource, vertex, conf,
> evaluatedMetrics, lastSummary);
> } else {
> return true;
> }
> } else {
> return detectImmediateScaleDownAfterScaleUp(vertex, conf,
> lastScalingTs);
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)