sharath1709 commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r2274810357


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##########
@@ -78,6 +88,45 @@ public static void computeDataRateMetrics(
         }
     }
 
+    private static Optional<Double> getObservedTpr(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Map<ScalingMetric, Double> scalingMetrics,
+            double numRecordsInPerSecond,
+            Configuration conf) {
+
+        // If there are no incoming records we return infinity to allow scale 
down
+        if (numRecordsInPerSecond == 0) {
+            return Optional.of(Double.POSITIVE_INFINITY);
+        }
+
+        // We only measure observed tpr when we are catching up, that is when 
the lag is beyond the
+        // configured observe threshold
+        boolean catchingUp =
+                scalingMetrics.getOrDefault(ScalingMetric.LAG, 0.)
+                        >= 
conf.get(AutoScalerOptions.OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD)
+                                        .toSeconds()
+                                * numRecordsInPerSecond;
+        if (!catchingUp) {
+            return Optional.empty();
+        }
+
+        double observedTpr =
+                computeObservedTprWithBackpressure(
+                        numRecordsInPerSecond,
+                        
flinkMetrics.get(FlinkMetric.BACKPRESSURE_TIME_PER_SEC).getAvg());
+
+        return Double.isNaN(observedTpr) ? Optional.empty() : 
Optional.of(observedTpr);
+    }
+
+    public static double computeObservedTprWithBackpressure(

Review Comment:
   This is an old PR but why is backpressure taken into account to compute 
ObservedTpr? NumRecordsInPerSecond it self should be the observedTpr since the 
job is already under lag. I understand the theoretical reason for computing it 
as numRecordsInPerSecond/(1 - backpressuredFraction). However, clearly the job 
is not able to process the data at this rate as evidenced by 
numRecordsInPerSecond due to whatever reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to