pnowojski commented on a change in pull request #16772:
URL: https://github.com/apache/flink/pull/16772#discussion_r688356378



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
##########
@@ -36,36 +36,49 @@ public ThroughputCalculator(Clock clock, int 
numberOfSamples) {
     }
 
     public void incomingDataSize(long receivedDataSize) {
-        resumeMeasurement();
+        // Force resuming measurement.
+        if (measurementStartTime == NOT_TRACKED) {
+            measurementStartTime = clock.absoluteTimeMillis();
+        }

Review comment:
       ```
   resumeMeasurement(clock.absoluteTimeMillis());
   ```
   ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
##########
@@ -59,6 +59,27 @@ public synchronized void markEnd() {
         }
     }
 
+    /**
+     * Duplicate of {@link #markStart()} with ability passing the time from 
outside for possible
+     * optimization on calling {@link Clock#absoluteTimeMillis()}.
+     */
+    public synchronized void markStart(long absoluteTimeMillis) {
+        if (currentMeasurementStart == 0) {
+            currentMeasurementStart = absoluteTimeMillis;
+        }
+    }

Review comment:
       nit:
   1. make this `private void markStartUnsafe(long absoluteTimeMillis);`
   2. use this unsafe method in both `public synchronized void markEnd()` and 
`public synchronized void markStart(long absoluteTimeMillis)` to deduplicate 
the code?
   
   ditto for `markEnd`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to