sharath1709 commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r2294715672


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##########
@@ -78,6 +88,45 @@ public static void computeDataRateMetrics(
         }
     }
 
+    private static Optional<Double> getObservedTpr(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Map<ScalingMetric, Double> scalingMetrics,
+            double numRecordsInPerSecond,
+            Configuration conf) {
+
+        // If there are no incoming records we return infinity to allow scale 
down
+        if (numRecordsInPerSecond == 0) {
+            return Optional.of(Double.POSITIVE_INFINITY);
+        }
+
+        // We only measure observed tpr when we are catching up, that is when 
the lag is beyond the
+        // configured observe threshold
+        boolean catchingUp =
+                scalingMetrics.getOrDefault(ScalingMetric.LAG, 0.)
+                        >= 
conf.get(AutoScalerOptions.OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD)
+                                        .toSeconds()
+                                * numRecordsInPerSecond;
+        if (!catchingUp) {
+            return Optional.empty();
+        }
+
+        double observedTpr =
+                computeObservedTprWithBackpressure(
+                        numRecordsInPerSecond,
+                        
flinkMetrics.get(FlinkMetric.BACKPRESSURE_TIME_PER_SEC).getAvg());
+
+        return Double.isNaN(observedTpr) ? Optional.empty() : 
Optional.of(observedTpr);
+    }
+
+    public static double computeObservedTprWithBackpressure(

Review Comment:
   Agreed. I will move the discussion to JIRA/mailing list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to