gyfora commented on code in PR #735:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/735#discussion_r1431637325
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx,
AutoscalerFlinkMetrics autoscalerMetri
jobTopology.getVerticesInTopologicalOrder(),
() -> lastEvaluatedMetrics.get(ctx.getJobKey()));
- if (!collectedMetrics.isFullyCollected()) {
- // We have done an upfront evaluation, but we are not ready for
scaling.
- resetRecommendedParallelism(evaluatedMetrics);
- return;
- }
-
var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
// A scaling tracking without an end time gets created whenever a
scaling decision is
// applied. Here, when the job transitions to RUNNING, we record the
time for it.
if (ctx.getJobStatus() == JobStatus.RUNNING) {
- if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+ if
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
now, jobTopology, scalingHistory)) {
stateStore.storeScalingTracking(ctx, scalingTracking);
}
}
Review Comment:
can you please extract this logic into a method to keep the flow simpler?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]