rdhabalia commented on code in PR #23930:
URL: https://github.com/apache/pulsar/pull/23930#discussion_r1943667929
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java:
##########
@@ -45,45 +49,170 @@ public DefaultMonotonicSnapshotClock(long
snapshotIntervalNanos, LongSupplier cl
this.sleepMillis =
TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
this.sleepNanos = (int) (snapshotIntervalNanos -
TimeUnit.MILLISECONDS.toNanos(sleepMillis));
this.clockSource = clockSource;
- updateSnapshotTickNanos();
- thread = new Thread(this::snapshotLoop, getClass().getSimpleName() +
"-update-loop");
- thread.setDaemon(true);
- thread.start();
+ this.snapshotIntervalNanos = snapshotIntervalNanos;
+ tickUpdaterThread = new TickUpdaterThread();
+ tickUpdaterThread.start();
}
/** {@inheritDoc} */
@Override
public long getTickNanos(boolean requestSnapshot) {
if (requestSnapshot) {
- updateSnapshotTickNanos();
+ tickUpdaterThread.requestUpdate();
}
return snapshotTickNanos;
}
- private void updateSnapshotTickNanos() {
- snapshotTickNanos = clockSource.getAsLong();
- }
+ /**
+ * A thread that updates snapshotTickNanos value periodically with a
configured interval.
+ * The thread is started when the DefaultMonotonicSnapshotClock is created
and runs until the close method is
+ * called.
+ * A single thread is used to read the clock source value since on some
hardware of virtualized platforms,
+ * System.nanoTime() isn't strictly monotonic across all CPUs. Reading by
a single thread will improve the
+ * stability of the read value since a single thread is scheduled on a
single CPU. If the thread is migrated
+ * to another CPU, the clock source value might leap backward or forward,
but logic in this class will handle it.
+ */
+ private class TickUpdaterThread extends Thread {
+ private final Object tickUpdateDelayMonitor = new Object();
+ private final Object tickUpdatedMonitor = new Object();
+ private final long maxDelta;
+ private long referenceClockSourceValue;
+ private long baseSnapshotTickNanos;
+ private long previousSnapshotTickNanos;
+ private volatile boolean running;
+ private boolean tickUpdateDelayMonitorNotified;
+ private long requestCount;
+
+ TickUpdaterThread() {
+ super(DefaultMonotonicSnapshotClock.class.getSimpleName() +
"-update-loop");
+ // set as daemon thread so that it doesn't prevent the JVM from
exiting
+ setDaemon(true);
+ // set the highest priority
+ setPriority(MAX_PRIORITY);
+ this.maxDelta = 2 * snapshotIntervalNanos;
+ }
+
+ @Override
+ public void run() {
+ try {
+ running = true;
+ long updatedForRequestCount = -1;
+ while (!isInterrupted()) {
+ try {
+ boolean snapshotRequested = false;
+ // sleep for the configured interval on a monitor that
can be notified to stop the sleep
+ // and update the tick value immediately. This is used
in requestUpdate method.
+ synchronized (tickUpdateDelayMonitor) {
+ tickUpdateDelayMonitorNotified = false;
+ // only wait if no explicit request has been made
since the last update
+ if (requestCount == updatedForRequestCount) {
+ // if no request has been made, sleep for the
configured interval
+ tickUpdateDelayMonitor.wait(sleepMillis,
sleepNanos);
+ snapshotRequested =
tickUpdateDelayMonitorNotified;
+ }
+ updatedForRequestCount = requestCount;
+ }
+ updateSnapshotTickNanos(snapshotRequested);
+ notifyAllTickUpdated();
+ } catch (InterruptedException e) {
+ interrupt();
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ // report unexpected error since this would be a fatal error
when the clock doesn't progress anymore
+ // this is very unlikely to happen, but it's better to log it
in any case
+ LOG.error("Unexpected fatal error that stopped the clock.", t);
+ } finally {
+ LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread
stopped. {},tid={}", this, getId());
+ running = false;
+ notifyAllTickUpdated();
+ }
+ }
+
+ private void updateSnapshotTickNanos(boolean snapshotRequested) {
+ long clockValue = clockSource.getAsLong();
+
+ // Initialization
+ if (referenceClockSourceValue == 0) {
+ referenceClockSourceValue = clockValue;
+ baseSnapshotTickNanos = clockValue;
+ snapshotTickNanos = clockValue;
+ previousSnapshotTickNanos = clockValue;
+ return;
+ }
+
+ // calculate the duration since the reference clock source value
+ // so that the snapshot value is always increasing and tolerates
it when the clock source is not strictly
+ // monotonic across all CPUs and leaps backward or forward
+ long durationSinceReference = clockValue -
referenceClockSourceValue;
+ long newSnapshotTickNanos = baseSnapshotTickNanos +
durationSinceReference;
+
+ // reset the reference clock source value if the clock source
value leaps backward or forward
+ if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDelta
+ || newSnapshotTickNanos > previousSnapshotTickNanos +
maxDelta) {
+ referenceClockSourceValue = clockValue;
+ baseSnapshotTickNanos = previousSnapshotTickNanos;
+ if (!snapshotRequested) {
+ // if the snapshot value is not requested, increment by
the snapshot interval
+ baseSnapshotTickNanos += snapshotIntervalNanos;
+ }
+ newSnapshotTickNanos = baseSnapshotTickNanos;
+ }
+
+ // update snapshotTickNanos value if the new value is greater than
the previous value
+ if (newSnapshotTickNanos > previousSnapshotTickNanos) {
+ snapshotTickNanos = newSnapshotTickNanos;
+ // store into a field so that we don't need to do a volatile
read to find out the previous value
+ previousSnapshotTickNanos = newSnapshotTickNanos;
+ }
+ }
+
+ private void notifyAllTickUpdated() {
+ synchronized (tickUpdatedMonitor) {
+ // notify all threads that are waiting for the tick value to
be updated
+ tickUpdatedMonitor.notifyAll();
+ }
+ }
+
+ public void requestUpdate() {
+ if (!running) {
+ // thread has stopped running, fallback to update the value
directly without any optimizations
+ snapshotTickNanos = clockSource.getAsLong();
+ return;
+ }
+ synchronized (tickUpdatedMonitor) {
+ // notify the thread to stop waiting and update the tick value
+ synchronized (tickUpdateDelayMonitor) {
+ tickUpdateDelayMonitorNotified = true;
+ requestCount++;
+ tickUpdateDelayMonitor.notify();
+ }
+ // wait until the tick value has been updated
+ try {
+ tickUpdatedMonitor.wait();
+ } catch (InterruptedException e) {
+ currentThread().interrupt();
+ }
+ }
+ }
- private void snapshotLoop() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- updateSnapshotTickNanos();
+ @Override
+ public synchronized void start() {
+ super.start();
+ // wait until the thread is started and the tick value has been
updated
+ synchronized (tickUpdatedMonitor) {
try {
- Thread.sleep(sleepMillis, sleepNanos);
+ tickUpdatedMonitor.wait();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
+ currentThread().interrupt();
}
}
- } catch (Throwable t) {
- // report unexpected error since this would be a fatal error when
the clock doesn't progress anymore
- // this is very unlikely to happen, but it's better to log it in
any case
- LOG.error("Unexpected fatal error that stopped the clock.", t);
}
}
@Override
public void close() {
- thread.interrupt();
+ tickUpdaterThread.interrupt();
Review Comment:
can we terminate while loop with some variable `stop` and update it here.
instead of depending on interrupting as it can lead leak if one changes the
interrupt logic in future in previous method.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java:
##########
@@ -45,45 +49,170 @@ public DefaultMonotonicSnapshotClock(long
snapshotIntervalNanos, LongSupplier cl
this.sleepMillis =
TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
this.sleepNanos = (int) (snapshotIntervalNanos -
TimeUnit.MILLISECONDS.toNanos(sleepMillis));
this.clockSource = clockSource;
- updateSnapshotTickNanos();
- thread = new Thread(this::snapshotLoop, getClass().getSimpleName() +
"-update-loop");
- thread.setDaemon(true);
- thread.start();
+ this.snapshotIntervalNanos = snapshotIntervalNanos;
+ tickUpdaterThread = new TickUpdaterThread();
+ tickUpdaterThread.start();
}
/** {@inheritDoc} */
@Override
public long getTickNanos(boolean requestSnapshot) {
if (requestSnapshot) {
- updateSnapshotTickNanos();
+ tickUpdaterThread.requestUpdate();
}
return snapshotTickNanos;
}
- private void updateSnapshotTickNanos() {
- snapshotTickNanos = clockSource.getAsLong();
- }
+ /**
+ * A thread that updates snapshotTickNanos value periodically with a
configured interval.
+ * The thread is started when the DefaultMonotonicSnapshotClock is created
and runs until the close method is
+ * called.
+ * A single thread is used to read the clock source value since on some
hardware of virtualized platforms,
+ * System.nanoTime() isn't strictly monotonic across all CPUs. Reading by
a single thread will improve the
+ * stability of the read value since a single thread is scheduled on a
single CPU. If the thread is migrated
+ * to another CPU, the clock source value might leap backward or forward,
but logic in this class will handle it.
+ */
+ private class TickUpdaterThread extends Thread {
+ private final Object tickUpdateDelayMonitor = new Object();
+ private final Object tickUpdatedMonitor = new Object();
+ private final long maxDelta;
+ private long referenceClockSourceValue;
+ private long baseSnapshotTickNanos;
+ private long previousSnapshotTickNanos;
+ private volatile boolean running;
+ private boolean tickUpdateDelayMonitorNotified;
+ private long requestCount;
+
+ TickUpdaterThread() {
+ super(DefaultMonotonicSnapshotClock.class.getSimpleName() +
"-update-loop");
+ // set as daemon thread so that it doesn't prevent the JVM from
exiting
+ setDaemon(true);
+ // set the highest priority
+ setPriority(MAX_PRIORITY);
+ this.maxDelta = 2 * snapshotIntervalNanos;
+ }
+
+ @Override
+ public void run() {
+ try {
+ running = true;
+ long updatedForRequestCount = -1;
+ while (!isInterrupted()) {
+ try {
+ boolean snapshotRequested = false;
+ // sleep for the configured interval on a monitor that
can be notified to stop the sleep
+ // and update the tick value immediately. This is used
in requestUpdate method.
+ synchronized (tickUpdateDelayMonitor) {
+ tickUpdateDelayMonitorNotified = false;
+ // only wait if no explicit request has been made
since the last update
+ if (requestCount == updatedForRequestCount) {
+ // if no request has been made, sleep for the
configured interval
+ tickUpdateDelayMonitor.wait(sleepMillis,
sleepNanos);
+ snapshotRequested =
tickUpdateDelayMonitorNotified;
+ }
+ updatedForRequestCount = requestCount;
+ }
+ updateSnapshotTickNanos(snapshotRequested);
+ notifyAllTickUpdated();
+ } catch (InterruptedException e) {
+ interrupt();
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ // report unexpected error since this would be a fatal error
when the clock doesn't progress anymore
+ // this is very unlikely to happen, but it's better to log it
in any case
+ LOG.error("Unexpected fatal error that stopped the clock.", t);
+ } finally {
+ LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread
stopped. {},tid={}", this, getId());
+ running = false;
+ notifyAllTickUpdated();
+ }
+ }
+
+ private void updateSnapshotTickNanos(boolean snapshotRequested) {
+ long clockValue = clockSource.getAsLong();
+
+ // Initialization
+ if (referenceClockSourceValue == 0) {
+ referenceClockSourceValue = clockValue;
+ baseSnapshotTickNanos = clockValue;
+ snapshotTickNanos = clockValue;
+ previousSnapshotTickNanos = clockValue;
+ return;
+ }
+
+ // calculate the duration since the reference clock source value
+ // so that the snapshot value is always increasing and tolerates
it when the clock source is not strictly
+ // monotonic across all CPUs and leaps backward or forward
+ long durationSinceReference = clockValue -
referenceClockSourceValue;
+ long newSnapshotTickNanos = baseSnapshotTickNanos +
durationSinceReference;
+
+ // reset the reference clock source value if the clock source
value leaps backward or forward
+ if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDelta
+ || newSnapshotTickNanos > previousSnapshotTickNanos +
maxDelta) {
+ referenceClockSourceValue = clockValue;
+ baseSnapshotTickNanos = previousSnapshotTickNanos;
+ if (!snapshotRequested) {
+ // if the snapshot value is not requested, increment by
the snapshot interval
+ baseSnapshotTickNanos += snapshotIntervalNanos;
Review Comment:
here, we assume that `updateSnapshotTickNanos` gets invoked at every
`snapshotIntervalNanos` time so, we are adding it to `baseSnapshotTickNanos`.
instead of if we pass `snapshotIntervalNanos` as a variable to this method
then method could be less error prone and can be also used in unit-test if
needed.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java:
##########
@@ -134,7 +135,11 @@ private void
unthrottleQueuedProducers(ScheduledExecutorService executor) {
// unthrottle as many producers as possible while there are token
available
while ((throttlingDuration = calculateThrottlingDurationNanos())
== 0L
&& (producer = unthrottlingQueue.poll()) != null) {
- producer.decrementThrottleCount();
+ try {
+ producer.decrementThrottleCount();
+ } catch (Exception e) {
+ log.error("Failed to unthrottle producer {}", producer, e);
Review Comment:
worth to add topic name here
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java:
##########
@@ -45,45 +49,170 @@ public DefaultMonotonicSnapshotClock(long
snapshotIntervalNanos, LongSupplier cl
this.sleepMillis =
TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
this.sleepNanos = (int) (snapshotIntervalNanos -
TimeUnit.MILLISECONDS.toNanos(sleepMillis));
this.clockSource = clockSource;
- updateSnapshotTickNanos();
- thread = new Thread(this::snapshotLoop, getClass().getSimpleName() +
"-update-loop");
- thread.setDaemon(true);
- thread.start();
+ this.snapshotIntervalNanos = snapshotIntervalNanos;
+ tickUpdaterThread = new TickUpdaterThread();
+ tickUpdaterThread.start();
}
/** {@inheritDoc} */
@Override
public long getTickNanos(boolean requestSnapshot) {
if (requestSnapshot) {
- updateSnapshotTickNanos();
+ tickUpdaterThread.requestUpdate();
}
return snapshotTickNanos;
}
- private void updateSnapshotTickNanos() {
- snapshotTickNanos = clockSource.getAsLong();
- }
+ /**
+ * A thread that updates snapshotTickNanos value periodically with a
configured interval.
+ * The thread is started when the DefaultMonotonicSnapshotClock is created
and runs until the close method is
+ * called.
+ * A single thread is used to read the clock source value since on some
hardware of virtualized platforms,
+ * System.nanoTime() isn't strictly monotonic across all CPUs. Reading by
a single thread will improve the
+ * stability of the read value since a single thread is scheduled on a
single CPU. If the thread is migrated
+ * to another CPU, the clock source value might leap backward or forward,
but logic in this class will handle it.
+ */
+ private class TickUpdaterThread extends Thread {
+ private final Object tickUpdateDelayMonitor = new Object();
+ private final Object tickUpdatedMonitor = new Object();
+ private final long maxDelta;
+ private long referenceClockSourceValue;
+ private long baseSnapshotTickNanos;
+ private long previousSnapshotTickNanos;
+ private volatile boolean running;
+ private boolean tickUpdateDelayMonitorNotified;
+ private long requestCount;
+
+ TickUpdaterThread() {
+ super(DefaultMonotonicSnapshotClock.class.getSimpleName() +
"-update-loop");
+ // set as daemon thread so that it doesn't prevent the JVM from
exiting
+ setDaemon(true);
+ // set the highest priority
+ setPriority(MAX_PRIORITY);
+ this.maxDelta = 2 * snapshotIntervalNanos;
+ }
+
+ @Override
+ public void run() {
+ try {
+ running = true;
+ long updatedForRequestCount = -1;
+ while (!isInterrupted()) {
+ try {
+ boolean snapshotRequested = false;
+ // sleep for the configured interval on a monitor that
can be notified to stop the sleep
+ // and update the tick value immediately. This is used
in requestUpdate method.
+ synchronized (tickUpdateDelayMonitor) {
+ tickUpdateDelayMonitorNotified = false;
+ // only wait if no explicit request has been made
since the last update
+ if (requestCount == updatedForRequestCount) {
+ // if no request has been made, sleep for the
configured interval
+ tickUpdateDelayMonitor.wait(sleepMillis,
sleepNanos);
+ snapshotRequested =
tickUpdateDelayMonitorNotified;
+ }
+ updatedForRequestCount = requestCount;
+ }
+ updateSnapshotTickNanos(snapshotRequested);
+ notifyAllTickUpdated();
+ } catch (InterruptedException e) {
+ interrupt();
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ // report unexpected error since this would be a fatal error
when the clock doesn't progress anymore
+ // this is very unlikely to happen, but it's better to log it
in any case
+ LOG.error("Unexpected fatal error that stopped the clock.", t);
+ } finally {
+ LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread
stopped. {},tid={}", this, getId());
+ running = false;
+ notifyAllTickUpdated();
+ }
+ }
+
+ private void updateSnapshotTickNanos(boolean snapshotRequested) {
Review Comment:
it would be great if we can add unit-test for this method and try to get max
code-coverage from unit-test as we can see high cyclomatic complexity in this
method code.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java:
##########
@@ -45,45 +49,170 @@ public DefaultMonotonicSnapshotClock(long
snapshotIntervalNanos, LongSupplier cl
this.sleepMillis =
TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
this.sleepNanos = (int) (snapshotIntervalNanos -
TimeUnit.MILLISECONDS.toNanos(sleepMillis));
this.clockSource = clockSource;
- updateSnapshotTickNanos();
- thread = new Thread(this::snapshotLoop, getClass().getSimpleName() +
"-update-loop");
- thread.setDaemon(true);
- thread.start();
+ this.snapshotIntervalNanos = snapshotIntervalNanos;
+ tickUpdaterThread = new TickUpdaterThread();
+ tickUpdaterThread.start();
}
/** {@inheritDoc} */
@Override
public long getTickNanos(boolean requestSnapshot) {
if (requestSnapshot) {
- updateSnapshotTickNanos();
+ tickUpdaterThread.requestUpdate();
}
return snapshotTickNanos;
}
- private void updateSnapshotTickNanos() {
- snapshotTickNanos = clockSource.getAsLong();
- }
+ /**
+ * A thread that updates snapshotTickNanos value periodically with a
configured interval.
+ * The thread is started when the DefaultMonotonicSnapshotClock is created
and runs until the close method is
+ * called.
+ * A single thread is used to read the clock source value since on some
hardware of virtualized platforms,
+ * System.nanoTime() isn't strictly monotonic across all CPUs. Reading by
a single thread will improve the
+ * stability of the read value since a single thread is scheduled on a
single CPU. If the thread is migrated
+ * to another CPU, the clock source value might leap backward or forward,
but logic in this class will handle it.
+ */
+ private class TickUpdaterThread extends Thread {
+ private final Object tickUpdateDelayMonitor = new Object();
+ private final Object tickUpdatedMonitor = new Object();
+ private final long maxDelta;
+ private long referenceClockSourceValue;
+ private long baseSnapshotTickNanos;
+ private long previousSnapshotTickNanos;
+ private volatile boolean running;
+ private boolean tickUpdateDelayMonitorNotified;
+ private long requestCount;
+
+ TickUpdaterThread() {
+ super(DefaultMonotonicSnapshotClock.class.getSimpleName() +
"-update-loop");
+ // set as daemon thread so that it doesn't prevent the JVM from
exiting
+ setDaemon(true);
+ // set the highest priority
+ setPriority(MAX_PRIORITY);
+ this.maxDelta = 2 * snapshotIntervalNanos;
+ }
+
+ @Override
+ public void run() {
+ try {
+ running = true;
+ long updatedForRequestCount = -1;
+ while (!isInterrupted()) {
+ try {
+ boolean snapshotRequested = false;
+ // sleep for the configured interval on a monitor that
can be notified to stop the sleep
+ // and update the tick value immediately. This is used
in requestUpdate method.
+ synchronized (tickUpdateDelayMonitor) {
+ tickUpdateDelayMonitorNotified = false;
+ // only wait if no explicit request has been made
since the last update
+ if (requestCount == updatedForRequestCount) {
+ // if no request has been made, sleep for the
configured interval
+ tickUpdateDelayMonitor.wait(sleepMillis,
sleepNanos);
+ snapshotRequested =
tickUpdateDelayMonitorNotified;
+ }
+ updatedForRequestCount = requestCount;
+ }
+ updateSnapshotTickNanos(snapshotRequested);
+ notifyAllTickUpdated();
+ } catch (InterruptedException e) {
+ interrupt();
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ // report unexpected error since this would be a fatal error
when the clock doesn't progress anymore
+ // this is very unlikely to happen, but it's better to log it
in any case
+ LOG.error("Unexpected fatal error that stopped the clock.", t);
+ } finally {
+ LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread
stopped. {},tid={}", this, getId());
+ running = false;
+ notifyAllTickUpdated();
+ }
+ }
+
+ private void updateSnapshotTickNanos(boolean snapshotRequested) {
+ long clockValue = clockSource.getAsLong();
+
+ // Initialization
+ if (referenceClockSourceValue == 0) {
+ referenceClockSourceValue = clockValue;
+ baseSnapshotTickNanos = clockValue;
+ snapshotTickNanos = clockValue;
+ previousSnapshotTickNanos = clockValue;
+ return;
+ }
+
+ // calculate the duration since the reference clock source value
+ // so that the snapshot value is always increasing and tolerates
it when the clock source is not strictly
+ // monotonic across all CPUs and leaps backward or forward
+ long durationSinceReference = clockValue -
referenceClockSourceValue;
+ long newSnapshotTickNanos = baseSnapshotTickNanos +
durationSinceReference;
+
+ // reset the reference clock source value if the clock source
value leaps backward or forward
+ if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDelta
+ || newSnapshotTickNanos > previousSnapshotTickNanos +
maxDelta) {
Review Comment:
btw, can we document what exactly we are doing with `baseSnapshotTickNanos`
to explain the logic here as it's little hard to understand what is happening
here?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java:
##########
@@ -45,45 +49,170 @@ public DefaultMonotonicSnapshotClock(long
snapshotIntervalNanos, LongSupplier cl
this.sleepMillis =
TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
this.sleepNanos = (int) (snapshotIntervalNanos -
TimeUnit.MILLISECONDS.toNanos(sleepMillis));
this.clockSource = clockSource;
- updateSnapshotTickNanos();
- thread = new Thread(this::snapshotLoop, getClass().getSimpleName() +
"-update-loop");
- thread.setDaemon(true);
- thread.start();
+ this.snapshotIntervalNanos = snapshotIntervalNanos;
+ tickUpdaterThread = new TickUpdaterThread();
+ tickUpdaterThread.start();
}
/** {@inheritDoc} */
@Override
public long getTickNanos(boolean requestSnapshot) {
if (requestSnapshot) {
- updateSnapshotTickNanos();
+ tickUpdaterThread.requestUpdate();
}
return snapshotTickNanos;
}
- private void updateSnapshotTickNanos() {
- snapshotTickNanos = clockSource.getAsLong();
- }
+ /**
+ * A thread that updates snapshotTickNanos value periodically with a
configured interval.
+ * The thread is started when the DefaultMonotonicSnapshotClock is created
and runs until the close method is
+ * called.
+ * A single thread is used to read the clock source value since on some
hardware of virtualized platforms,
+ * System.nanoTime() isn't strictly monotonic across all CPUs. Reading by
a single thread will improve the
+ * stability of the read value since a single thread is scheduled on a
single CPU. If the thread is migrated
+ * to another CPU, the clock source value might leap backward or forward,
but logic in this class will handle it.
+ */
+ private class TickUpdaterThread extends Thread {
+ private final Object tickUpdateDelayMonitor = new Object();
+ private final Object tickUpdatedMonitor = new Object();
+ private final long maxDelta;
+ private long referenceClockSourceValue;
+ private long baseSnapshotTickNanos;
+ private long previousSnapshotTickNanos;
+ private volatile boolean running;
+ private boolean tickUpdateDelayMonitorNotified;
+ private long requestCount;
+
+ TickUpdaterThread() {
+ super(DefaultMonotonicSnapshotClock.class.getSimpleName() +
"-update-loop");
+ // set as daemon thread so that it doesn't prevent the JVM from
exiting
+ setDaemon(true);
+ // set the highest priority
+ setPriority(MAX_PRIORITY);
+ this.maxDelta = 2 * snapshotIntervalNanos;
+ }
+
+ @Override
+ public void run() {
+ try {
+ running = true;
+ long updatedForRequestCount = -1;
+ while (!isInterrupted()) {
+ try {
+ boolean snapshotRequested = false;
+ // sleep for the configured interval on a monitor that
can be notified to stop the sleep
+ // and update the tick value immediately. This is used
in requestUpdate method.
+ synchronized (tickUpdateDelayMonitor) {
+ tickUpdateDelayMonitorNotified = false;
+ // only wait if no explicit request has been made
since the last update
+ if (requestCount == updatedForRequestCount) {
+ // if no request has been made, sleep for the
configured interval
+ tickUpdateDelayMonitor.wait(sleepMillis,
sleepNanos);
+ snapshotRequested =
tickUpdateDelayMonitorNotified;
+ }
+ updatedForRequestCount = requestCount;
+ }
+ updateSnapshotTickNanos(snapshotRequested);
+ notifyAllTickUpdated();
+ } catch (InterruptedException e) {
+ interrupt();
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ // report unexpected error since this would be a fatal error
when the clock doesn't progress anymore
+ // this is very unlikely to happen, but it's better to log it
in any case
+ LOG.error("Unexpected fatal error that stopped the clock.", t);
+ } finally {
+ LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread
stopped. {},tid={}", this, getId());
+ running = false;
+ notifyAllTickUpdated();
+ }
+ }
+
+ private void updateSnapshotTickNanos(boolean snapshotRequested) {
+ long clockValue = clockSource.getAsLong();
+
+ // Initialization
+ if (referenceClockSourceValue == 0) {
+ referenceClockSourceValue = clockValue;
+ baseSnapshotTickNanos = clockValue;
+ snapshotTickNanos = clockValue;
+ previousSnapshotTickNanos = clockValue;
+ return;
+ }
+
+ // calculate the duration since the reference clock source value
+ // so that the snapshot value is always increasing and tolerates
it when the clock source is not strictly
+ // monotonic across all CPUs and leaps backward or forward
+ long durationSinceReference = clockValue -
referenceClockSourceValue;
+ long newSnapshotTickNanos = baseSnapshotTickNanos +
durationSinceReference;
+
+ // reset the reference clock source value if the clock source
value leaps backward or forward
+ if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDelta
Review Comment:
now, as we are anyway, using single thread update, then can't we just avoid
this complex check, and just directly use `System.nanoTime()` retrieving from
the single CPU and use that incremented count?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]