Copilot commented on code in PR #10544:
URL: https://github.com/apache/rocketmq/pull/10544#discussion_r3458468303


##########
common/src/main/java/org/apache/rocketmq/common/ServiceThread.java:
##########
@@ -97,32 +97,49 @@ public void makeStop() {
             return;
         }
         this.stopped = true;
+        // wake up the parked worker so it observes the stop flag promptly
+        wakeup();
         log.info("makestop thread[{}] ", this.getServiceName());
     }
 
     public void wakeup() {
+        if (hasNotified.get()) {
+            return;
+        }
         if (hasNotified.compareAndSet(false, true)) {
-            waitPoint.countDown(); // notify
+            LockSupport.unpark(this.thread); // notify
         }
     }
 
     protected void waitForRunning(long interval) {
+        // Publish the parking thread so wakeup() can target it (also handles 
restart).
+        this.thread = Thread.currentThread();
+
         if (hasNotified.compareAndSet(true, false)) {

Review Comment:
   `waitForRunning()` overwrites `this.thread` with the calling thread. If 
`thread` is also used for lifecycle management elsewhere (e.g., `start()` sets 
it, `shutdown()`/`join()` uses it), calling `waitForRunning()` from any 
non-service thread can cause `wakeup()` to unpark the wrong thread and/or cause 
shutdown to join the wrong thread. A safer approach is to keep `thread` as the 
dedicated service thread reference and introduce a separate `volatile Thread 
parkingThread` (updated in `waitForRunning`) for `LockSupport.unpark(...)`, or 
ensure `thread` is only ever assigned in `start()`.



##########
common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java:
##########
@@ -43,49 +64,170 @@ public void testWakeup() {
         ServiceThread testServiceThread = startTestServiceThread();
         testServiceThread.wakeup();
         assertEquals(true, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
     }
 
-    @Test
+    @Test(timeout = 5000)
     public void testWaitForRunning() {
         ServiceThread testServiceThread = startTestServiceThread();
-        // test waitForRunning
-        testServiceThread.waitForRunning(1000);
+        // Not notified: returns after the (short) interval with the flag 
cleared.
+        testServiceThread.waitForRunning(50);
         assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(1, testServiceThread.waitPoint.getCount());
-        // test wake up
+        // wakeup() arms the notification.
         testServiceThread.wakeup();
         assertEquals(true, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
-        // repeat waitForRunning
-        testServiceThread.waitForRunning(1000);
-        assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
-        // repeat waitForRunning again
-        testServiceThread.waitForRunning(1000);
+        // The next waitForRunning() must consume the notification 
immediately, never blocking for
+        // the (huge) interval -- this is exactly what the lost-wakeup race 
used to break.
+        long begin = System.currentTimeMillis();
+        testServiceThread.waitForRunning(TimeUnit.MINUTES.toMillis(1));
+        long elapsed = System.currentTimeMillis() - begin;
         assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(1, testServiceThread.waitPoint.getCount());
+        assertTrue("waitForRunning() must fast-path on a pending notification, 
elapsed=" + elapsed + "ms",
+            elapsed < 1000);
     }
 
-    private ServiceThread startTestServiceThread() {
-        return startTestServiceThread(false);
+    /**
+     * A single {@code wakeup()} must wake a long-interval wait almost 
immediately, instead of
+     * letting it block for the full interval (which is what the lost-wakeup 
race used to cause).
+     */
+    @Test(timeout = 5000)
+    public void testWakeupDeliveredPromptly() throws Exception {
+        TestServiceThread service = new TestServiceThread();
+        AtomicBoolean returned = new AtomicBoolean(false);
+        long longInterval = TimeUnit.SECONDS.toMillis(10);
+
+        Thread waiter = new Thread(() -> {
+            service.doWait(longInterval);
+            returned.set(true);
+        }, "waiter");
+        waiter.start();
+
+        // Let the waiter enter the park loop.
+        Thread.sleep(200);
+
+        long startNanos = System.nanoTime();
+        service.wakeup();
+        waiter.join(2000);
+        long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos);
+
+        assertTrue("waitForRunning() did not return after wakeup() within 2s, 
elapsed=" + elapsedMs + "ms",
+            returned.get());
+        assertTrue("wake latency should be far below the interval, elapsed=" + 
elapsedMs + "ms",
+            elapsedMs < 2000);
     }
 
-    private ServiceThread startTestServiceThread(boolean daemon) {
-        ServiceThread testServiceThread = new ServiceThread() {
+    /**
+     * Hammer the exact pattern that triggered the lost-wakeup race: a {@code 
wakeup()} fired right
+     * as the waiter is entering the wait. With the LockSupport-based 
implementation no signal may be
+     * lost, so every iteration must return well within its (long) interval.
+     */
+    @Test(timeout = 60000)
+    public void testNoWakeupLostUnderStress() throws Exception {
+        int iterations = 1000;
+        long longInterval = TimeUnit.SECONDS.toMillis(5);
+        int lost = 0;
+
+        for (int i = 0; i < iterations; i++) {
+            TestServiceThread service = new TestServiceThread();
+            AtomicBoolean returned = new AtomicBoolean(false);
+
+            Thread waiter = new Thread(() -> {
+                service.doWait(longInterval);
+                returned.set(true);
+            }, "waiter-" + i);
+            waiter.start();
+
+            // Increase the chance the wakeup lands in the CAS-to-park window.
+            Thread.yield();
+            service.wakeup();
+
+            // With the fix the waiter returns in microseconds; a lost signal 
would block for the
+            // full 5s interval, so a 2s join is more than enough to 
distinguish the two.
+            waiter.join(2000);
+            if (!returned.get()) {
+                lost++;
+                waiter.interrupt();
+                waiter.join(1000);
+            }
+        }
+
+        assertEquals("ServiceThread must not lose any wakeup signal", 0, lost);
+    }
+
+    /**
+     * Single consumer draining {@code waitForRunning} in a tight loop while 
several threads race to
+     * {@code wakeup()} it. A lost wakeup shows up as a wait that blocks for 
the full interval.
+     */
+    @Test(timeout = 30000)
+    public void serviceThreadShouldNotLoseWakeupUnderStress() throws Exception 
{
+        final int stressIterations = 10000;
+        final int wakerThreads = 4;
+        final long waitTimeoutMs = 20;
+        final long lostWakeupThresholdMs = 18;

Review Comment:
   These thresholds are extremely tight (20ms wait with an 18ms 'lost wakeup' 
cutoff) and are likely to be flaky under normal CI variance (scheduler jitter, 
GC pauses, timer granularity). To make the test robust, increase the interval 
and use a much larger threshold (e.g., wait 200–1000ms and fail only if it 
blocks close to the full interval), or structure the test to differentiate 
'prompt wakeup' vs 'full-interval stall' using a long interval (seconds) and a 
conservative cutoff (e.g., < 500ms).



##########
common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java:
##########
@@ -43,49 +64,170 @@ public void testWakeup() {
         ServiceThread testServiceThread = startTestServiceThread();
         testServiceThread.wakeup();
         assertEquals(true, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
     }
 
-    @Test
+    @Test(timeout = 5000)
     public void testWaitForRunning() {
         ServiceThread testServiceThread = startTestServiceThread();
-        // test waitForRunning
-        testServiceThread.waitForRunning(1000);
+        // Not notified: returns after the (short) interval with the flag 
cleared.
+        testServiceThread.waitForRunning(50);
         assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(1, testServiceThread.waitPoint.getCount());
-        // test wake up
+        // wakeup() arms the notification.
         testServiceThread.wakeup();
         assertEquals(true, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
-        // repeat waitForRunning
-        testServiceThread.waitForRunning(1000);
-        assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
-        // repeat waitForRunning again
-        testServiceThread.waitForRunning(1000);
+        // The next waitForRunning() must consume the notification 
immediately, never blocking for
+        // the (huge) interval -- this is exactly what the lost-wakeup race 
used to break.
+        long begin = System.currentTimeMillis();
+        testServiceThread.waitForRunning(TimeUnit.MINUTES.toMillis(1));
+        long elapsed = System.currentTimeMillis() - begin;
         assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(1, testServiceThread.waitPoint.getCount());
+        assertTrue("waitForRunning() must fast-path on a pending notification, 
elapsed=" + elapsed + "ms",
+            elapsed < 1000);
     }
 
-    private ServiceThread startTestServiceThread() {
-        return startTestServiceThread(false);
+    /**
+     * A single {@code wakeup()} must wake a long-interval wait almost 
immediately, instead of
+     * letting it block for the full interval (which is what the lost-wakeup 
race used to cause).
+     */
+    @Test(timeout = 5000)
+    public void testWakeupDeliveredPromptly() throws Exception {
+        TestServiceThread service = new TestServiceThread();
+        AtomicBoolean returned = new AtomicBoolean(false);
+        long longInterval = TimeUnit.SECONDS.toMillis(10);
+
+        Thread waiter = new Thread(() -> {
+            service.doWait(longInterval);
+            returned.set(true);
+        }, "waiter");
+        waiter.start();
+
+        // Let the waiter enter the park loop.
+        Thread.sleep(200);
+
+        long startNanos = System.nanoTime();
+        service.wakeup();
+        waiter.join(2000);
+        long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos);
+
+        assertTrue("waitForRunning() did not return after wakeup() within 2s, 
elapsed=" + elapsedMs + "ms",
+            returned.get());
+        assertTrue("wake latency should be far below the interval, elapsed=" + 
elapsedMs + "ms",
+            elapsedMs < 2000);
     }
 
-    private ServiceThread startTestServiceThread(boolean daemon) {
-        ServiceThread testServiceThread = new ServiceThread() {
+    /**
+     * Hammer the exact pattern that triggered the lost-wakeup race: a {@code 
wakeup()} fired right
+     * as the waiter is entering the wait. With the LockSupport-based 
implementation no signal may be
+     * lost, so every iteration must return well within its (long) interval.
+     */
+    @Test(timeout = 60000)
+    public void testNoWakeupLostUnderStress() throws Exception {
+        int iterations = 1000;
+        long longInterval = TimeUnit.SECONDS.toMillis(5);
+        int lost = 0;
+
+        for (int i = 0; i < iterations; i++) {
+            TestServiceThread service = new TestServiceThread();
+            AtomicBoolean returned = new AtomicBoolean(false);
+
+            Thread waiter = new Thread(() -> {
+                service.doWait(longInterval);
+                returned.set(true);
+            }, "waiter-" + i);
+            waiter.start();

Review Comment:
   This test creates up to 1000 new JVM threads in a tight loop, which can 
significantly slow the suite and increase flakiness on constrained CI agents. 
Consider reusing a small executor / a single reusable waiter thread with 
per-iteration coordination (e.g., `CountDownLatch`/`CyclicBarrier`) or reduce 
the iteration count while keeping the 'long-interval stall' signal strong.



##########
common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java:
##########
@@ -43,49 +64,170 @@ public void testWakeup() {
         ServiceThread testServiceThread = startTestServiceThread();
         testServiceThread.wakeup();
         assertEquals(true, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
     }
 
-    @Test
+    @Test(timeout = 5000)
     public void testWaitForRunning() {
         ServiceThread testServiceThread = startTestServiceThread();
-        // test waitForRunning
-        testServiceThread.waitForRunning(1000);
+        // Not notified: returns after the (short) interval with the flag 
cleared.
+        testServiceThread.waitForRunning(50);
         assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(1, testServiceThread.waitPoint.getCount());
-        // test wake up
+        // wakeup() arms the notification.
         testServiceThread.wakeup();
         assertEquals(true, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
-        // repeat waitForRunning
-        testServiceThread.waitForRunning(1000);
-        assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
-        // repeat waitForRunning again
-        testServiceThread.waitForRunning(1000);
+        // The next waitForRunning() must consume the notification 
immediately, never blocking for
+        // the (huge) interval -- this is exactly what the lost-wakeup race 
used to break.
+        long begin = System.currentTimeMillis();
+        testServiceThread.waitForRunning(TimeUnit.MINUTES.toMillis(1));
+        long elapsed = System.currentTimeMillis() - begin;

Review Comment:
   For measuring elapsed time in tests, `System.nanoTime()` is preferable to 
`currentTimeMillis()` (monotonic vs wall-clock adjustments). Using `nanoTime()` 
here will avoid rare failures caused by clock changes or time sync events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to