gyfora commented on code in PR #613:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/613#discussion_r1222525508
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##########
@@ -143,4 +163,18 @@ private AutoscalerFlinkMetrics
getOrInitAutoscalerFlinkMetrics(
new AutoscalerFlinkMetrics(
ctx.getResourceMetricGroup().addGroup("AutoScaler")));
}
+
+ private void resetRecommendedParallelisms(
+ Map<ResourceID, Map<JobVertexID, Integer>> recommendedParallelisms,
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
lastEvaluatedMetrics,
+ ResourceID resourceID) {
+ var parallelisms = new HashMap<JobVertexID, Integer>();
+ lastEvaluatedMetrics.forEach(
+ (jobVertexID, map) -> {
+ parallelisms.put(
+ jobVertexID, (int)
map.get(ScalingMetric.PARALLELISM).getCurrent());
+ });
+ recommendedParallelisms.put(resourceID, parallelisms);
+ LOG.debug("Recommended parallelisms are reset to current {}",
parallelisms);
+ }
Review Comment:
This could be removed and unified with the `updateRecommendedParallelisms`
method (make it static utility maybe?) as the resetting is the same as updating
with empty scaling summaries.
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##########
@@ -99,22 +104,37 @@ public boolean scale(FlinkResourceContext<? extends
AbstractFlinkResource<?, ?>>
var collectedMetrics =
metricsCollector.updateMetrics(
- resource, autoScalerInfo, ctx.getFlinkService(),
conf);
+ resource,
+ autoScalerInfo,
+ ctx.getFlinkService(),
+ conf,
+ recommendedParallelisms);
LOG.debug("Evaluating scaling metrics for {}", collectedMetrics);
var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
LOG.debug("Scaling metrics evaluated: {}", evaluatedMetrics);
lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
- flinkMetrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceId));
+
+ flinkMetrics.registerEvaluatedScalingMetrics(
+ () -> lastEvaluatedMetrics.get(resourceId));
+ flinkMetrics.registerRecommendedParallelismMetrics(
+ () -> recommendedParallelisms.get(resourceId));
if (!collectedMetrics.isFullyCollected()) {
// We have done an upfront evaluation, but we are not ready
for scaling.
+ resetRecommendedParallelisms(recommendedParallelisms,
evaluatedMetrics, resourceId);
autoScalerInfo.replaceInKubernetes(kubernetesClient);
return false;
}
var specAdjusted =
- scalingExecutor.scaleResource(resource, autoScalerInfo,
conf, evaluatedMetrics);
+ scalingExecutor.scaleResource(
+ resource,
+ autoScalerInfo,
+ conf,
+ evaluatedMetrics,
+ recommendedParallelisms);
Review Comment:
Not something that should be fixed in this PR but we should consider a
refactor by introducing some sort of autoscaler resource context to avoid so
many params
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##########
@@ -99,22 +104,37 @@ public boolean scale(FlinkResourceContext<? extends
AbstractFlinkResource<?, ?>>
var collectedMetrics =
metricsCollector.updateMetrics(
- resource, autoScalerInfo, ctx.getFlinkService(),
conf);
+ resource,
+ autoScalerInfo,
+ ctx.getFlinkService(),
+ conf,
+ recommendedParallelisms);
LOG.debug("Evaluating scaling metrics for {}", collectedMetrics);
var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
LOG.debug("Scaling metrics evaluated: {}", evaluatedMetrics);
lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
- flinkMetrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceId));
+
+ flinkMetrics.registerEvaluatedScalingMetrics(
+ () -> lastEvaluatedMetrics.get(resourceId));
+ flinkMetrics.registerRecommendedParallelismMetrics(
+ () -> recommendedParallelisms.get(resourceId));
Review Comment:
I think we should pass these together because they both need to be provided
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]