sharath1709 commented on code in PR #686: URL: https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r2274810357
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java: ########## @@ -78,6 +88,45 @@ public static void computeDataRateMetrics( } } + private static Optional<Double> getObservedTpr( + Map<FlinkMetric, AggregatedMetric> flinkMetrics, + Map<ScalingMetric, Double> scalingMetrics, + double numRecordsInPerSecond, + Configuration conf) { + + // If there are no incoming records we return infinity to allow scale down + if (numRecordsInPerSecond == 0) { + return Optional.of(Double.POSITIVE_INFINITY); + } + + // We only measure observed tpr when we are catching up, that is when the lag is beyond the + // configured observe threshold + boolean catchingUp = + scalingMetrics.getOrDefault(ScalingMetric.LAG, 0.) + >= conf.get(AutoScalerOptions.OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD) + .toSeconds() + * numRecordsInPerSecond; + if (!catchingUp) { + return Optional.empty(); + } + + double observedTpr = + computeObservedTprWithBackpressure( + numRecordsInPerSecond, + flinkMetrics.get(FlinkMetric.BACKPRESSURE_TIME_PER_SEC).getAvg()); + + return Double.isNaN(observedTpr) ? Optional.empty() : Optional.of(observedTpr); + } + + public static double computeObservedTprWithBackpressure( Review Comment: This is an old PR but why is backpressure taken into account to compute ObservedTpr? NumRecordsInPerSecond it self should be the observedTpr since the job is already under lag. I understand the theoretical reason for computing it as numRecordsInPerSecond/(1 - backpressuredFraction). However, clearly the job is not able to process the data at this rate as evidenced by numRecordsInPerSecond due to whatever reason. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org