mxm commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1365558874


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##########
@@ -97,32 +99,30 @@ public CollectedMetricHistory updateMetrics(
                             }
                         });
 
-        // The timestamp of the first metric observation marks the start
-        // If we haven't collected any metrics, we are starting now
-        var metricCollectionStartTs = metricHistory.isEmpty() ? now : 
metricHistory.firstKey();
-
         var jobDetailsInfo =
                 getJobDetailsInfo(ctx, 
conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
         var jobUpdateTs = getJobUpdateTs(jobDetailsInfo);
-        if (jobUpdateTs.isAfter(metricCollectionStartTs)) {
+        // We detect job change compared to our collected metrics by checking 
against the earliest
+        // metric timestamp
+        if (!metricHistory.isEmpty() && 
jobUpdateTs.isAfter(metricHistory.firstKey())) {
             LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs);
             stateStore.removeEvaluatedMetrics(ctx);
             cleanup(ctx.getJobKey());
             metricHistory.clear();
-            metricCollectionStartTs = now;
         }
         var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
+        var stableTime = 
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
 
-        // Trim metrics outside the metric window from metrics history
+        // Calculate timestamp when the metric windows is full
         var metricWindowSize = getMetricWindowSize(conf);
-        metricHistory.headMap(now.minus(metricWindowSize)).clear();
+        var metricsAfterStable = metricHistory.tailMap(stableTime);
+        var windowFullTime =
+                metricsAfterStable.isEmpty()
+                        ? now.plus(metricWindowSize)
+                        : metricsAfterStable.firstKey().plus(metricWindowSize);

Review Comment:
   ```suggestion    
           var windowFullTime = 
getWindowFullTime(metricHistory.tailMap(stableTime), now, metricWindowSize);
   ```
   
   ```java
       private static Instant getWindowFullTime(SortedMap<Instant, 
CollectedMetrics> metricsAfterStable, Instant now, Duration metricWindowSize) {
           return metricsAfterStable.isEmpty()
                           ? now.plus(metricWindowSize)
                           : 
metricsAfterStable.firstKey().plus(metricWindowSize);
       }
   
   ```
   
   To avoid confusion with the other variables. Especially because of the clear 
in line 125 which makes `metricsAfterStable` unusable.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##########
@@ -97,32 +99,30 @@ public CollectedMetricHistory updateMetrics(
                             }
                         });
 
-        // The timestamp of the first metric observation marks the start
-        // If we haven't collected any metrics, we are starting now
-        var metricCollectionStartTs = metricHistory.isEmpty() ? now : 
metricHistory.firstKey();
-
         var jobDetailsInfo =
                 getJobDetailsInfo(ctx, 
conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
         var jobUpdateTs = getJobUpdateTs(jobDetailsInfo);
-        if (jobUpdateTs.isAfter(metricCollectionStartTs)) {
+        // We detect job change compared to our collected metrics by checking 
against the earliest
+        // metric timestamp
+        if (!metricHistory.isEmpty() && 
jobUpdateTs.isAfter(metricHistory.firstKey())) {
             LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs);
             stateStore.removeEvaluatedMetrics(ctx);
             cleanup(ctx.getJobKey());
             metricHistory.clear();
-            metricCollectionStartTs = now;
         }
         var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
+        var stableTime = 
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
 
-        // Trim metrics outside the metric window from metrics history
+        // Calculate timestamp when the metric windows is full
         var metricWindowSize = getMetricWindowSize(conf);
-        metricHistory.headMap(now.minus(metricWindowSize)).clear();
+        var metricsAfterStable = metricHistory.tailMap(stableTime);
+        var windowFullTime =
+                metricsAfterStable.isEmpty()
+                        ? now.plus(metricWindowSize)
+                        : metricsAfterStable.firstKey().plus(metricWindowSize);
 
-        var stableTime = 
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
-        if (now.isBefore(stableTime)) {
-            // As long as we are stabilizing, collect no metrics at all
-            LOG.info("Skipping metric collection during stabilization period 
until {}", stableTime);
-            return new CollectedMetricHistory(topology, 
Collections.emptySortedMap());
-        }
+        // Trim metrics outside the metric window from metrics history
+        metricHistory.headMap(now.minus(metricWindowSize)).clear();

Review Comment:
   This introduces a bug in case the metric window size is smaller than the 
stabilization period. In this case we will cut off metrics from the 
stabilization period which we use for determining observed true processing rate.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -154,6 +153,56 @@ private Map<ScalingMetric, EvaluatedScalingMetric> 
evaluateMetrics(
         return evaluatedMetrics;
     }
 
+    private static EvaluatedScalingMetric evaluateTpr(
+            SortedMap<Instant, CollectedMetrics> metricsHistory,
+            JobVertexID vertex,
+            Map<ScalingMetric, Double> latestVertexMetrics,
+            Configuration conf) {
+
+        var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory);
+        var observedTprAvg =
+                getAverage(
+                        OBSERVED_TPR,
+                        vertex,
+                        metricsHistory,
+                        
conf.get(AutoScalerOptions.OBSERVED_TPR_MIN_OBSERVATIONS));
+
+        var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, 
observedTprAvg);
+        return new EvaluatedScalingMetric(
+                latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
+                tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
+    }
+
+    private static ScalingMetric selectTprMetric(
+            JobVertexID jobVertexID,
+            Configuration conf,
+            double busyTimeTprAvg,
+            double observedTprAvg) {
+
+        if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
+            return OBSERVED_TPR;
+        }
+
+        if (Double.isNaN(observedTprAvg)) {
+            return TRUE_PROCESSING_RATE;
+        }
+
+        double switchThreshold = 
conf.get(AutoScalerOptions.OBSERVED_TPR_SWITCH_THRESHOLD);
+        // If we could measure the observed tpr we decide whether to switch to 
using it
+        // instead of busy time based on the error / difference between the two
+        if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {

Review Comment:
   Should we add a config-based switch here to turn this on / off?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##########
@@ -97,32 +99,30 @@ public CollectedMetricHistory updateMetrics(
                             }
                         });
 
-        // The timestamp of the first metric observation marks the start
-        // If we haven't collected any metrics, we are starting now
-        var metricCollectionStartTs = metricHistory.isEmpty() ? now : 
metricHistory.firstKey();
-
         var jobDetailsInfo =
                 getJobDetailsInfo(ctx, 
conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
         var jobUpdateTs = getJobUpdateTs(jobDetailsInfo);
-        if (jobUpdateTs.isAfter(metricCollectionStartTs)) {
+        // We detect job change compared to our collected metrics by checking 
against the earliest
+        // metric timestamp
+        if (!metricHistory.isEmpty() && 
jobUpdateTs.isAfter(metricHistory.firstKey())) {
             LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs);
             stateStore.removeEvaluatedMetrics(ctx);
             cleanup(ctx.getJobKey());
             metricHistory.clear();
-            metricCollectionStartTs = now;
         }
         var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
+        var stableTime = 
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
 
-        // Trim metrics outside the metric window from metrics history
+        // Calculate timestamp when the metric windows is full
         var metricWindowSize = getMetricWindowSize(conf);
-        metricHistory.headMap(now.minus(metricWindowSize)).clear();
+        var metricsAfterStable = metricHistory.tailMap(stableTime);
+        var windowFullTime =
+                metricsAfterStable.isEmpty()
+                        ? now.plus(metricWindowSize)
+                        : metricsAfterStable.firstKey().plus(metricWindowSize);
 
-        var stableTime = 
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
-        if (now.isBefore(stableTime)) {
-            // As long as we are stabilizing, collect no metrics at all
-            LOG.info("Skipping metric collection during stabilization period 
until {}", stableTime);
-            return new CollectedMetricHistory(topology, 
Collections.emptySortedMap());
-        }
+        // Trim metrics outside the metric window from metrics history
+        metricHistory.headMap(now.minus(metricWindowSize)).clear();

Review Comment:
   ```suggestion
           metricHistory.headMap(jobUpdateTs).clear();
   ```
   
   This should fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to