lhotari commented on code in PR #23930:
URL: https://github.com/apache/pulsar/pull/23930#discussion_r1944340948


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java:
##########
@@ -45,45 +49,170 @@ public DefaultMonotonicSnapshotClock(long 
snapshotIntervalNanos, LongSupplier cl
         this.sleepMillis = 
TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
         this.sleepNanos = (int) (snapshotIntervalNanos - 
TimeUnit.MILLISECONDS.toNanos(sleepMillis));
         this.clockSource = clockSource;
-        updateSnapshotTickNanos();
-        thread = new Thread(this::snapshotLoop, getClass().getSimpleName() + 
"-update-loop");
-        thread.setDaemon(true);
-        thread.start();
+        this.snapshotIntervalNanos = snapshotIntervalNanos;
+        tickUpdaterThread = new TickUpdaterThread();
+        tickUpdaterThread.start();
     }
 
     /** {@inheritDoc} */
     @Override
     public long getTickNanos(boolean requestSnapshot) {
         if (requestSnapshot) {
-            updateSnapshotTickNanos();
+            tickUpdaterThread.requestUpdate();
         }
         return snapshotTickNanos;
     }
 
-    private void updateSnapshotTickNanos() {
-        snapshotTickNanos = clockSource.getAsLong();
-    }
+    /**
+     * A thread that updates snapshotTickNanos value periodically with a 
configured interval.
+     * The thread is started when the DefaultMonotonicSnapshotClock is created 
and runs until the close method is
+     * called.
+     * A single thread is used to read the clock source value since on some 
hardware of virtualized platforms,
+     * System.nanoTime() isn't strictly monotonic across all CPUs. Reading by 
a single thread will improve the
+     * stability of the read value since a single thread is scheduled on a 
single CPU. If the thread is migrated
+     * to another CPU, the clock source value might leap backward or forward, 
but logic in this class will handle it.
+     */
+    private class TickUpdaterThread extends Thread {
+        private final Object tickUpdateDelayMonitor = new Object();
+        private final Object tickUpdatedMonitor = new Object();
+        private final long maxDelta;
+        private long referenceClockSourceValue;
+        private long baseSnapshotTickNanos;
+        private long previousSnapshotTickNanos;
+        private volatile boolean running;
+        private boolean tickUpdateDelayMonitorNotified;
+        private long requestCount;
+
+        TickUpdaterThread() {
+            super(DefaultMonotonicSnapshotClock.class.getSimpleName() + 
"-update-loop");
+            // set as daemon thread so that it doesn't prevent the JVM from 
exiting
+            setDaemon(true);
+            // set the highest priority
+            setPriority(MAX_PRIORITY);
+            this.maxDelta = 2 * snapshotIntervalNanos;
+        }
+
+        @Override
+        public void run() {
+            try {
+                running = true;
+                long updatedForRequestCount = -1;
+                while (!isInterrupted()) {
+                    try {
+                        boolean snapshotRequested = false;
+                        // sleep for the configured interval on a monitor that 
can be notified to stop the sleep
+                        // and update the tick value immediately. This is used 
in requestUpdate method.
+                        synchronized (tickUpdateDelayMonitor) {
+                            tickUpdateDelayMonitorNotified = false;
+                            // only wait if no explicit request has been made 
since the last update
+                            if (requestCount == updatedForRequestCount) {
+                                // if no request has been made, sleep for the 
configured interval
+                                tickUpdateDelayMonitor.wait(sleepMillis, 
sleepNanos);
+                                snapshotRequested = 
tickUpdateDelayMonitorNotified;
+                            }
+                            updatedForRequestCount = requestCount;
+                        }
+                        updateSnapshotTickNanos(snapshotRequested);
+                        notifyAllTickUpdated();
+                    } catch (InterruptedException e) {
+                        interrupt();
+                        break;
+                    }
+                }
+            } catch (Throwable t) {
+                // report unexpected error since this would be a fatal error 
when the clock doesn't progress anymore
+                // this is very unlikely to happen, but it's better to log it 
in any case
+                LOG.error("Unexpected fatal error that stopped the clock.", t);
+            } finally {
+                LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread 
stopped. {},tid={}", this, getId());
+                running = false;
+                notifyAllTickUpdated();
+            }
+        }
+
+        private void updateSnapshotTickNanos(boolean snapshotRequested) {

Review Comment:
   I added a few tests and the line coverage is now 97% (73/75) and branch 
coverage is 90% (18/20) with the current DefaultMonotonicSnapshotClock. There's 
only 2 lines and 2 branches that aren't covered which are lines 196 and 209 
where there's handling for InterruptedException. Adding test coverage for those 
lines isn't reasonable, so this means that max code coverage is reached.
   
   Based on metrics, the complexity of the of method code isn't high since 
methods are fairly small.
   
   <img width="1259" alt="image" 
src="https://github.com/user-attachments/assets/3bf121c1-694b-4a61-bb98-90abc6710dfd";
 />
   
   for example, PersistentTopic contains methods which have high complexity 
according to metrics:
   <img width="1229" alt="image" 
src="https://github.com/user-attachments/assets/970976ce-4a4c-408f-9551-5de640dabfcc";
 />
   
   Metrics calculated with IntelliJ plugin 
"[MetricsReloaded](https://plugins.jetbrains.com/plugin/93-metricsreloaded)"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to