divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r897838652
##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -34,35 +35,56 @@
*/
public abstract class SampledStat implements MeasurableStat {
- private double initialValue;
+ private final double initialValue;
+ /**
+ * Index of the latest stored sample.
+ */
private int current = 0;
+ /**
+ * Stores the recorded samples in a ring buffer.
+ */
protected List<Sample> samples;
public SampledStat(double initialValue) {
this.initialValue = initialValue;
this.samples = new ArrayList<>(2);
}
+ /**
+ * {@inheritDoc}
+ *
+ * On every record, do the following:
+ * 1. Check if the current window has expired
+ * 2. If yes, then advance the current pointer to new window. The start
time of the new window is set to nearest
+ * possible starting point for the new window. The nearest starting
point occurs at config.timeWindowMs intervals
+ * from the end time of last known window.
+ * 3. Update the recorded value for the current window
+ * 4. Increase the number of event count
+ */
@Override
- public void record(MetricConfig config, double value, long timeMs) {
- Sample sample = current(timeMs);
- if (sample.isComplete(timeMs, config))
- sample = advance(config, timeMs);
- update(sample, config, value, timeMs);
- sample.eventCount += 1;
+ public void record(MetricConfig config, double value, long
recordingTimeMs) {
+ Sample sample = current(recordingTimeMs);
+ if (sample.isComplete(recordingTimeMs, config)) {
+ final long previousWindowStartTime = sample.lastWindowMs;
+ final long previousWindowEndtime = previousWindowStartTime +
config.timeWindowMs();
+ final long startTimeOfNewWindow = recordingTimeMs -
((recordingTimeMs - previousWindowEndtime) % config.timeWindowMs());
Review Comment:
That is a great observation Tom! Ideally the code should be written to
ensure that recording a metric should not block because the operation is wall
clock time sensitive. But as you observed, we have `synchronized` at multiple
places which may lead to sample being recorded in a window which has already
completed in the past.
For cases when the `sensor` is used for calculating the ConnectionQuota,
this problem wouldn't occur because the calculation of `Time.milliseconds` is
done inside a `synchronised` block which ensures that ensures that only one
thread with latest timestamp will be accessing the sensor.record at a time.
But I don't know about other code paths other than ConnectionQuota that use
sensor and your observation is valid.
Since this problem is independent of this code change, and breaks existing
logic if/when recordingTimeMs < endTimeOfPreviousWindow, I have created a JIRA
to address this in a separate PR:
https://issues.apache.org/jira/browse/KAFKA-13994
[1]
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L1541-L1542
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]