This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8076702c4c3 MINOR: Add Unit test for `TimingWheel` (#20443)
8076702c4c3 is described below

commit 8076702c4c39dc41dbc4b9f1b77ccedca14ac2e3
Author: Ken Huang <s7133...@gmail.com>
AuthorDate: Fri Sep 5 05:55:57 2025 +0800

    MINOR: Add Unit test for `TimingWheel` (#20443)
    
    There is any unit test for `TimingWheel`, we should add test for it.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../kafka/server/util/timer/TimingWheel.java       |  10 +
 .../kafka/server/util/timer/TimingWheelTest.java   | 218 +++++++++++++++++++++
 2 files changed, 228 insertions(+)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java
index 156768fb412..f9a2029d09a 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java
@@ -182,4 +182,14 @@ public class TimingWheel {
             if (overflowWheel != null) 
overflowWheel.advanceClock(currentTimeMs);
         }
     }
+
+    // only for testing
+    TimingWheel overflowWheel() {
+        return this.overflowWheel;
+    }
+
+    // only for testing
+    long currentTimeMs() {
+        return this.currentTimeMs;
+    }
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/util/timer/TimingWheelTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/util/timer/TimingWheelTest.java
new file mode 100644
index 00000000000..48e21e18db6
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/util/timer/TimingWheelTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TimingWheelTest {
+
+    @Test
+    public void testAddValidTask() {
+        AtomicInteger taskCounter = new AtomicInteger(0);
+        DelayQueue<TimerTaskList> queue = new DelayQueue<>();
+        long startMs = 1000L;
+        long tickMs = 10L;
+        TimingWheel timingWheel = new TimingWheel(tickMs, 5, startMs, 
taskCounter, queue);
+
+        // Create task within current time interval
+        long expirationMs = startMs + tickMs * 2; // 1020ms
+        TimerTask task = new TestTimerTask(tickMs * 2);
+        TimerTaskEntry entry = new TimerTaskEntry(task, expirationMs);
+
+        assertTrue(timingWheel.add(entry), "Should successfully add valid 
task");
+        assertFalse(queue.isEmpty());
+        assertEquals(1, taskCounter.get());
+    }
+
+    @Test
+    public void testAddExpiredTask() {
+        long startMs = 1000L;
+        TimingWheel timingWheel = new TimingWheel(
+            10L, 
+            5, 
+            startMs, 
+            new AtomicInteger(0), 
+            new DelayQueue<>()
+        );
+
+        long expirationMs = startMs - 1; // 999ms, less than current time
+        TimerTask task = new TestTimerTask(-1);
+        TimerTaskEntry entry = new TimerTaskEntry(task, expirationMs);
+
+        assertFalse(timingWheel.add(entry), "Expired task should not be 
added");
+    }
+
+    @Test
+    public void testAddCancelledTask() {
+        long startMs = 1000L;
+        long tickMs = 10L;
+        TimingWheel timingWheel = new TimingWheel(
+            tickMs,
+            5,
+            startMs,
+            new AtomicInteger(0),
+            new DelayQueue<>()
+        );
+
+        long expirationMs = startMs + tickMs * 2;
+        TimerTask task = new TestTimerTask(tickMs * 2);
+        TimerTaskEntry entry = new TimerTaskEntry(task, expirationMs);
+
+        task.cancel();
+
+        assertFalse(timingWheel.add(entry), "Cancelled task should not be 
added");
+        assertTrue(task.isCancelled(), "Task should be marked as cancelled");
+    }
+
+    @Test
+    public void testAddTaskInCurrentBucket() {
+        long startMs = 1000L;
+        TimingWheel timingWheel = new TimingWheel(
+            10L,
+            5,
+            startMs,
+            new AtomicInteger(0),
+            new DelayQueue<>()
+        );
+
+        long expirationMs = startMs + 5; // Within current tick
+        TimerTask task = new TestTimerTask(5);
+        TimerTaskEntry entry = new TimerTaskEntry(task, expirationMs);
+
+        assertFalse(timingWheel.add(entry), "Task within current tick should 
be expired immediately");
+    }
+
+    @Test
+    public void testAdvanceClockWithinTick() {
+        long startMs = 1000L;
+        TimingWheel timingWheel = new TimingWheel(
+            10L,
+            5,
+            startMs,
+            new AtomicInteger(0),
+            new DelayQueue<>()
+        );
+
+        timingWheel.advanceClock(startMs + 5);
+
+        assertEquals(startMs, timingWheel.currentTimeMs(), "Clock should not 
advance within the same tick");
+    }
+
+    @Test
+    public void testAdvanceClockToNextTick() {
+        long startMs = 1000L;
+        long tickMs = 10L;
+        TimingWheel timingWheel = new TimingWheel(
+            tickMs,
+            5,
+            startMs,
+            new AtomicInteger(0),
+            new DelayQueue<>()
+        );
+
+        timingWheel.advanceClock(startMs + tickMs);
+
+        assertEquals(startMs + tickMs, timingWheel.currentTimeMs(), "Clock 
should advance to next tick");
+    }
+
+    @Test
+    public void testOverflowWheelCreation() {
+        long startMs = 1000L;
+        long tickMs = 10L;
+        int wheelSize = 5;
+        TimingWheel timingWheel = new TimingWheel(
+            tickMs,
+            wheelSize,
+            startMs,
+            new AtomicInteger(0),
+            new DelayQueue<>()
+        );
+        
+        assertNull(timingWheel.overflowWheel(), "Overflow wheel should not 
exist initially");
+
+        // First overflow task should create parent wheel
+        long interval = tickMs * wheelSize;
+        long overflowTime = startMs + interval + tickMs;
+
+        TimerTask task = new TestTimerTask(interval + tickMs);
+        TimerTaskEntry entry = new TimerTaskEntry(task, overflowTime);
+
+        assertTrue(timingWheel.add(entry));
+        assertNotNull(timingWheel.overflowWheel(), "Overflow wheel should be 
created");
+
+        // Adding second overflow task should use existing parent wheel
+        TimingWheel existingOverflowWheel = timingWheel.overflowWheel();
+        TimerTask task2 = new TestTimerTask(interval + tickMs + 1);
+        TimerTaskEntry entry2 = new TimerTaskEntry(task2, overflowTime + 1);
+
+        assertTrue(timingWheel.add(entry2));
+        assertSame(existingOverflowWheel, timingWheel.overflowWheel());
+    }
+
+    @Test
+    public void testAdvanceClockWithOverflowWheel() {
+        long startMs = 1000L;
+        long tickMs = 10L;
+        int wheelSize = 5;
+        TimingWheel timingWheel = new TimingWheel(
+            tickMs,
+            wheelSize,
+            startMs,
+            new AtomicInteger(0),
+            new DelayQueue<>()
+        );
+
+        // Create overflow wheel
+        long interval = tickMs * wheelSize;
+        long overflowTime = startMs + interval + tickMs;
+        TimerTask task = new TestTimerTask(interval + tickMs);
+        TimerTaskEntry entry = new TimerTaskEntry(task, overflowTime);
+        timingWheel.add(entry);
+
+        assertNotNull(timingWheel.overflowWheel(), "Overflow wheel should be 
created");
+
+        // Advancing clock should also advance overflow wheel clock
+        long advanceTime = startMs + tickMs * wheelSize + 10; // 1060ms
+        timingWheel.advanceClock(advanceTime);
+
+        // Verify both wheels advanced
+        assertEquals(advanceTime, timingWheel.currentTimeMs(), "Main wheel 
clock should advance");
+        assertEquals(startMs + tickMs * wheelSize, 
timingWheel.overflowWheel().currentTimeMs(), "Overflow wheel clock should also 
advance");
+    }
+
+    private static class TestTimerTask extends TimerTask {
+
+        TestTimerTask(long delayMs) {
+            super(delayMs);
+        }
+
+        @Override
+        public void run() {
+            // No-op
+        }
+    }
+}
\ No newline at end of file

Reply via email to