mxm commented on code in PR #628:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/628#discussion_r1259539220
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -223,16 +220,26 @@ private void updateKafkaSourceMaxParallelisms(
* Given a map of collected Flink vertex metrics we compute the scaling
metrics for each job
* vertex.
*
- * @param collectedMetrics Collected metrics for all job vertices.
+ * @param flinkMetrics Collected metrics for all job vertices.
* @return Computed scaling metrics for all job vertices.
*/
private CollectedMetrics convertToScalingMetrics(
ResourceID resourceID,
- Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>>
collectedMetrics,
+ Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
Review Comment:
```suggestion
Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>>
collectedMetrics,
```
Together with my comment above.
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -313,41 +320,37 @@ protected Map<JobVertexID, Map<String, FlinkMetric>>
queryFilteredMetricNames(
var jobId =
JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
var vertices = topology.getVerticesInTopologicalOrder();
- long deployedGeneration = getDeployedGeneration(cr);
-
- var previousMetricNames =
availableVertexMetricNames.get(ResourceID.fromResource(cr));
-
- if (previousMetricNames != null) {
- if (deployedGeneration == previousMetricNames.f0) {
- // We have already gathered the metric names for this spec, no
need to query again
- return previousMetricNames.f1;
- } else {
- availableVertexMetricNames.remove(ResourceID.fromResource(cr));
- }
- }
-
- try (var restClient = (RestClusterClient<String>)
flinkService.getClusterClient(conf)) {
- var names =
- vertices.stream()
- .collect(
- Collectors.toMap(
- v -> v,
- v ->
-
getFilteredVertexMetricNames(
- restClient, jobId,
v, topology)));
- availableVertexMetricNames.put(
- ResourceID.fromResource(cr), Tuple2.of(deployedGeneration,
names));
- return names;
- }
- }
-
- public static long getDeployedGeneration(AbstractFlinkResource<?, ?> cr) {
- return cr.getStatus()
- .getReconciliationStatus()
- .deserializeLastReconciledSpecWithMeta()
- .getMeta()
- .getMetadata()
- .getGeneration();
+ var resourceId = ResourceID.fromResource(cr);
+ var names =
+ availableVertexMetricNames.compute(
+ resourceId,
+ (k, previousMetricNames) -> {
+ if (previousMetricNames != null
+ && previousMetricNames
+ .keySet()
+
.equals(topology.getParallelisms().keySet())) {
+ // We have already gathered the metric names
for this topology
+ return previousMetricNames;
Review Comment:
Could it be that metrics are added late and we are missing some in the first
attempt?
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java:
##########
@@ -67,4 +72,18 @@ public static double getTargetProcessingCapacity(
return Math.round(lagCatchupTargetRate + restartCatchupRate +
inputTargetAtUtilization);
}
+
+ /** Temporarily exclude vertex from scaling for this run. This does not
update the spec. */
+ public static boolean excludeVertexFromScaling(Configuration conf,
JobVertexID jobVertexId) {
+ Set<String> excludedIds = new
HashSet<>(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS));
+ boolean added = excludedIds.add(jobVertexId.toHexString());
+ conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new
ArrayList<>(excludedIds));
+ return added;
+ }
Review Comment:
```suggestion
public static boolean excludeVertexFromScaling(Configuration conf,
JobVertexID jobVertexId) {
excludeVerticesFromScaling(conf,
Collections.singletonList(jobVertexId));
}
```
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -223,16 +220,26 @@ private void updateKafkaSourceMaxParallelisms(
* Given a map of collected Flink vertex metrics we compute the scaling
metrics for each job
* vertex.
*
- * @param collectedMetrics Collected metrics for all job vertices.
+ * @param flinkMetrics Collected metrics for all job vertices.
* @return Computed scaling metrics for all job vertices.
*/
private CollectedMetrics convertToScalingMetrics(
ResourceID resourceID,
- Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>>
collectedMetrics,
+ Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
JobTopology jobTopology,
Configuration conf) {
var out = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
+
+ var finishedVertices = jobTopology.getFinishedVertices();
+ Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> collectedMetrics;
+ if (finishedVertices.isEmpty()) {
+ collectedMetrics = flinkMetrics;
+ } else {
+ collectedMetrics = new HashMap<>(flinkMetrics);
+ finishedVertices.forEach(v -> collectedMetrics.put(v,
FlinkMetric.FINISHED_METRICS));
+ }
Review Comment:
```suggestion
if (!finishedVertices.isEmpty()) {
collectedMetrics = new HashMap<>(collectedMetrics);
finishedVertices.forEach(v -> collectedMetrics.put(v,
FlinkMetric.FINISHED_METRICS));
}
```
I would prefer this over adding an extra variable because it is easier to
read for me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]