mxm commented on code in PR #686: URL: https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1365558874
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ########## @@ -97,32 +99,30 @@ public CollectedMetricHistory updateMetrics( } }); - // The timestamp of the first metric observation marks the start - // If we haven't collected any metrics, we are starting now - var metricCollectionStartTs = metricHistory.isEmpty() ? now : metricHistory.firstKey(); - var jobDetailsInfo = getJobDetailsInfo(ctx, conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT)); var jobUpdateTs = getJobUpdateTs(jobDetailsInfo); - if (jobUpdateTs.isAfter(metricCollectionStartTs)) { + // We detect job change compared to our collected metrics by checking against the earliest + // metric timestamp + if (!metricHistory.isEmpty() && jobUpdateTs.isAfter(metricHistory.firstKey())) { LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs); stateStore.removeEvaluatedMetrics(ctx); cleanup(ctx.getJobKey()); metricHistory.clear(); - metricCollectionStartTs = now; } var topology = getJobTopology(ctx, stateStore, jobDetailsInfo); + var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL)); - // Trim metrics outside the metric window from metrics history + // Calculate timestamp when the metric windows is full var metricWindowSize = getMetricWindowSize(conf); - metricHistory.headMap(now.minus(metricWindowSize)).clear(); + var metricsAfterStable = metricHistory.tailMap(stableTime); + var windowFullTime = + metricsAfterStable.isEmpty() + ? now.plus(metricWindowSize) + : metricsAfterStable.firstKey().plus(metricWindowSize); Review Comment: ```suggestion var windowFullTime = getWindowFullTime(metricHistory.tailMap(stableTime), now, metricWindowSize); ``` ```java private static Instant getWindowFullTime(SortedMap<Instant, CollectedMetrics> metricsAfterStable, Instant now, Duration metricWindowSize) { return metricsAfterStable.isEmpty() ? now.plus(metricWindowSize) : metricsAfterStable.firstKey().plus(metricWindowSize); } ``` To avoid confusion with the other variables. Especially because of the clear in line 125 which makes `metricsAfterStable` unusable. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ########## @@ -97,32 +99,30 @@ public CollectedMetricHistory updateMetrics( } }); - // The timestamp of the first metric observation marks the start - // If we haven't collected any metrics, we are starting now - var metricCollectionStartTs = metricHistory.isEmpty() ? now : metricHistory.firstKey(); - var jobDetailsInfo = getJobDetailsInfo(ctx, conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT)); var jobUpdateTs = getJobUpdateTs(jobDetailsInfo); - if (jobUpdateTs.isAfter(metricCollectionStartTs)) { + // We detect job change compared to our collected metrics by checking against the earliest + // metric timestamp + if (!metricHistory.isEmpty() && jobUpdateTs.isAfter(metricHistory.firstKey())) { LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs); stateStore.removeEvaluatedMetrics(ctx); cleanup(ctx.getJobKey()); metricHistory.clear(); - metricCollectionStartTs = now; } var topology = getJobTopology(ctx, stateStore, jobDetailsInfo); + var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL)); - // Trim metrics outside the metric window from metrics history + // Calculate timestamp when the metric windows is full var metricWindowSize = getMetricWindowSize(conf); - metricHistory.headMap(now.minus(metricWindowSize)).clear(); + var metricsAfterStable = metricHistory.tailMap(stableTime); + var windowFullTime = + metricsAfterStable.isEmpty() + ? now.plus(metricWindowSize) + : metricsAfterStable.firstKey().plus(metricWindowSize); - var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL)); - if (now.isBefore(stableTime)) { - // As long as we are stabilizing, collect no metrics at all - LOG.info("Skipping metric collection during stabilization period until {}", stableTime); - return new CollectedMetricHistory(topology, Collections.emptySortedMap()); - } + // Trim metrics outside the metric window from metrics history + metricHistory.headMap(now.minus(metricWindowSize)).clear(); Review Comment: This introduces a bug in case the metric window size is smaller than the stabilization period. In this case we will cut off metrics from the stabilization period which we use for determining observed true processing rate. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ########## @@ -154,6 +153,56 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics( return evaluatedMetrics; } + private static EvaluatedScalingMetric evaluateTpr( + SortedMap<Instant, CollectedMetrics> metricsHistory, + JobVertexID vertex, + Map<ScalingMetric, Double> latestVertexMetrics, + Configuration conf) { + + var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, metricsHistory); + var observedTprAvg = + getAverage( + OBSERVED_TPR, + vertex, + metricsHistory, + conf.get(AutoScalerOptions.OBSERVED_TPR_MIN_OBSERVATIONS)); + + var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, observedTprAvg); + return new EvaluatedScalingMetric( + latestVertexMetrics.getOrDefault(tprMetric, Double.NaN), + tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg); + } + + private static ScalingMetric selectTprMetric( + JobVertexID jobVertexID, + Configuration conf, + double busyTimeTprAvg, + double observedTprAvg) { + + if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) { + return OBSERVED_TPR; + } + + if (Double.isNaN(observedTprAvg)) { + return TRUE_PROCESSING_RATE; + } + + double switchThreshold = conf.get(AutoScalerOptions.OBSERVED_TPR_SWITCH_THRESHOLD); + // If we could measure the observed tpr we decide whether to switch to using it + // instead of busy time based on the error / difference between the two + if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) { Review Comment: Should we add a config-based switch here to turn this on / off? ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ########## @@ -97,32 +99,30 @@ public CollectedMetricHistory updateMetrics( } }); - // The timestamp of the first metric observation marks the start - // If we haven't collected any metrics, we are starting now - var metricCollectionStartTs = metricHistory.isEmpty() ? now : metricHistory.firstKey(); - var jobDetailsInfo = getJobDetailsInfo(ctx, conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT)); var jobUpdateTs = getJobUpdateTs(jobDetailsInfo); - if (jobUpdateTs.isAfter(metricCollectionStartTs)) { + // We detect job change compared to our collected metrics by checking against the earliest + // metric timestamp + if (!metricHistory.isEmpty() && jobUpdateTs.isAfter(metricHistory.firstKey())) { LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs); stateStore.removeEvaluatedMetrics(ctx); cleanup(ctx.getJobKey()); metricHistory.clear(); - metricCollectionStartTs = now; } var topology = getJobTopology(ctx, stateStore, jobDetailsInfo); + var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL)); - // Trim metrics outside the metric window from metrics history + // Calculate timestamp when the metric windows is full var metricWindowSize = getMetricWindowSize(conf); - metricHistory.headMap(now.minus(metricWindowSize)).clear(); + var metricsAfterStable = metricHistory.tailMap(stableTime); + var windowFullTime = + metricsAfterStable.isEmpty() + ? now.plus(metricWindowSize) + : metricsAfterStable.firstKey().plus(metricWindowSize); - var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL)); - if (now.isBefore(stableTime)) { - // As long as we are stabilizing, collect no metrics at all - LOG.info("Skipping metric collection during stabilization period until {}", stableTime); - return new CollectedMetricHistory(topology, Collections.emptySortedMap()); - } + // Trim metrics outside the metric window from metrics history + metricHistory.headMap(now.minus(metricWindowSize)).clear(); Review Comment: ```suggestion metricHistory.headMap(jobUpdateTs).clear(); ``` This should fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org