mxm commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422277362


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
                     .withDescription(
                             "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 

Review Comment:
   I'm wondering whether we should add an on/off switch in case there are any 
issues with the TM metric collection.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##########
@@ -78,19 +91,87 @@ protected Map<FlinkMetric, AggregatedMetric> 
queryAggregatedVertexMetrics(
                                     EmptyRequestBody.getInstance())
                             .get();
 
-            return responseBody.getMetrics().stream()
-                    .collect(
-                            Collectors.toMap(
-                                    m -> metrics.get(m.getId()),
-                                    m -> m,
-                                    (m1, m2) ->
-                                            new AggregatedMetric(
-                                                    m1.getId() + " merged with 
" + m2.getId(),
-                                                    Math.min(m1.getMin(), 
m2.getMin()),
-                                                    Math.max(m1.getMax(), 
m2.getMax()),
-                                                    // Average can't be 
computed
-                                                    Double.NaN,
-                                                    m1.getSum() + 
m2.getSum())));
+            return aggregateByFlinkMetric(metrics, responseBody);
         }
     }
+
+    protected Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx) 
throws Exception {
+        try (var restClient = ctx.getRestClusterClient()) {
+            var metricNames = getTmMetricNames(restClient, ctx);
+            var metricNameMapping = new HashMap<String, FlinkMetric>();
+
+            REQUIRED_TM_METRICS.forEach(
+                    fm -> {
+                        var name =
+                                fm.findAny(metricNames)
+                                        .orElseThrow(
+                                                () ->
+                                                        new RuntimeException(
+                                                                "Could not 
find required TM metric "
+                                                                        + 
fm.name()));
+                        metricNameMapping.put(name, fm);
+                    });

Review Comment:
   I'm a little bit concerned these non-parallel requests might take too long 
for deployments with hundreds of task managers. Especially because this feature 
cannot be turned off.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
                     .withDescription(
                             "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+    public static final ConfigOption<Double> GC_PRESSURE_THRESHOLD =
+            autoScalerConfig("memory.gc-pressure.threshold")
+                    .doubleType()
+                    .defaultValue(0.3)
+                    .withDescription("Max allowed GC pressure during scaling 
operations");
+
+    public static final ConfigOption<Double> HEAP_USAGE_THRESHOLD =
+            autoScalerConfig("memory.heap-usage.threshold")
+                    .doubleType()
+                    .defaultValue(0.9)
+                    .withDescription("Max allowed Heap usage during scaling 
operations");

Review Comment:
   ```suggestion
                       .withDescription("Max allowed percentage of heap usage 
during scaling operations. Autoscaling will be paused if the heap usage exceeds 
this threshold.");
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
                     .withDescription(
                             "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+    public static final ConfigOption<Double> GC_PRESSURE_THRESHOLD =
+            autoScalerConfig("memory.gc-pressure.threshold")
+                    .doubleType()
+                    .defaultValue(0.3)
+                    .withDescription("Max allowed GC pressure during scaling 
operations");

Review Comment:
   ```suggestion
                       .withDescription("Max allowed GC pressure (percentage 
spent garbage collecting) during scaling operations. Autoscaling will be paused 
if the GC pressure exceeds this limit.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to