This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8076702c4c3 MINOR: Add Unit test for `TimingWheel` (#20443) 8076702c4c3 is described below commit 8076702c4c39dc41dbc4b9f1b77ccedca14ac2e3 Author: Ken Huang <s7133...@gmail.com> AuthorDate: Fri Sep 5 05:55:57 2025 +0800 MINOR: Add Unit test for `TimingWheel` (#20443) There is any unit test for `TimingWheel`, we should add test for it. Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/server/util/timer/TimingWheel.java | 10 + .../kafka/server/util/timer/TimingWheelTest.java | 218 +++++++++++++++++++++ 2 files changed, 228 insertions(+) diff --git a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java index 156768fb412..f9a2029d09a 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java @@ -182,4 +182,14 @@ public class TimingWheel { if (overflowWheel != null) overflowWheel.advanceClock(currentTimeMs); } } + + // only for testing + TimingWheel overflowWheel() { + return this.overflowWheel; + } + + // only for testing + long currentTimeMs() { + return this.currentTimeMs; + } } diff --git a/server-common/src/test/java/org/apache/kafka/server/util/timer/TimingWheelTest.java b/server-common/src/test/java/org/apache/kafka/server/util/timer/TimingWheelTest.java new file mode 100644 index 00000000000..48e21e18db6 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/util/timer/TimingWheelTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TimingWheelTest { + + @Test + public void testAddValidTask() { + AtomicInteger taskCounter = new AtomicInteger(0); + DelayQueue<TimerTaskList> queue = new DelayQueue<>(); + long startMs = 1000L; + long tickMs = 10L; + TimingWheel timingWheel = new TimingWheel(tickMs, 5, startMs, taskCounter, queue); + + // Create task within current time interval + long expirationMs = startMs + tickMs * 2; // 1020ms + TimerTask task = new TestTimerTask(tickMs * 2); + TimerTaskEntry entry = new TimerTaskEntry(task, expirationMs); + + assertTrue(timingWheel.add(entry), "Should successfully add valid task"); + assertFalse(queue.isEmpty()); + assertEquals(1, taskCounter.get()); + } + + @Test + public void testAddExpiredTask() { + long startMs = 1000L; + TimingWheel timingWheel = new TimingWheel( + 10L, + 5, + startMs, + new AtomicInteger(0), + new DelayQueue<>() + ); + + long expirationMs = startMs - 1; // 999ms, less than current time + TimerTask task = new TestTimerTask(-1); + TimerTaskEntry entry = new TimerTaskEntry(task, expirationMs); + + assertFalse(timingWheel.add(entry), "Expired task should not be added"); + } + + @Test + public void testAddCancelledTask() { + long startMs = 1000L; + long tickMs = 10L; + TimingWheel timingWheel = new TimingWheel( + tickMs, + 5, + startMs, + new AtomicInteger(0), + new DelayQueue<>() + ); + + long expirationMs = startMs + tickMs * 2; + TimerTask task = new TestTimerTask(tickMs * 2); + TimerTaskEntry entry = new TimerTaskEntry(task, expirationMs); + + task.cancel(); + + assertFalse(timingWheel.add(entry), "Cancelled task should not be added"); + assertTrue(task.isCancelled(), "Task should be marked as cancelled"); + } + + @Test + public void testAddTaskInCurrentBucket() { + long startMs = 1000L; + TimingWheel timingWheel = new TimingWheel( + 10L, + 5, + startMs, + new AtomicInteger(0), + new DelayQueue<>() + ); + + long expirationMs = startMs + 5; // Within current tick + TimerTask task = new TestTimerTask(5); + TimerTaskEntry entry = new TimerTaskEntry(task, expirationMs); + + assertFalse(timingWheel.add(entry), "Task within current tick should be expired immediately"); + } + + @Test + public void testAdvanceClockWithinTick() { + long startMs = 1000L; + TimingWheel timingWheel = new TimingWheel( + 10L, + 5, + startMs, + new AtomicInteger(0), + new DelayQueue<>() + ); + + timingWheel.advanceClock(startMs + 5); + + assertEquals(startMs, timingWheel.currentTimeMs(), "Clock should not advance within the same tick"); + } + + @Test + public void testAdvanceClockToNextTick() { + long startMs = 1000L; + long tickMs = 10L; + TimingWheel timingWheel = new TimingWheel( + tickMs, + 5, + startMs, + new AtomicInteger(0), + new DelayQueue<>() + ); + + timingWheel.advanceClock(startMs + tickMs); + + assertEquals(startMs + tickMs, timingWheel.currentTimeMs(), "Clock should advance to next tick"); + } + + @Test + public void testOverflowWheelCreation() { + long startMs = 1000L; + long tickMs = 10L; + int wheelSize = 5; + TimingWheel timingWheel = new TimingWheel( + tickMs, + wheelSize, + startMs, + new AtomicInteger(0), + new DelayQueue<>() + ); + + assertNull(timingWheel.overflowWheel(), "Overflow wheel should not exist initially"); + + // First overflow task should create parent wheel + long interval = tickMs * wheelSize; + long overflowTime = startMs + interval + tickMs; + + TimerTask task = new TestTimerTask(interval + tickMs); + TimerTaskEntry entry = new TimerTaskEntry(task, overflowTime); + + assertTrue(timingWheel.add(entry)); + assertNotNull(timingWheel.overflowWheel(), "Overflow wheel should be created"); + + // Adding second overflow task should use existing parent wheel + TimingWheel existingOverflowWheel = timingWheel.overflowWheel(); + TimerTask task2 = new TestTimerTask(interval + tickMs + 1); + TimerTaskEntry entry2 = new TimerTaskEntry(task2, overflowTime + 1); + + assertTrue(timingWheel.add(entry2)); + assertSame(existingOverflowWheel, timingWheel.overflowWheel()); + } + + @Test + public void testAdvanceClockWithOverflowWheel() { + long startMs = 1000L; + long tickMs = 10L; + int wheelSize = 5; + TimingWheel timingWheel = new TimingWheel( + tickMs, + wheelSize, + startMs, + new AtomicInteger(0), + new DelayQueue<>() + ); + + // Create overflow wheel + long interval = tickMs * wheelSize; + long overflowTime = startMs + interval + tickMs; + TimerTask task = new TestTimerTask(interval + tickMs); + TimerTaskEntry entry = new TimerTaskEntry(task, overflowTime); + timingWheel.add(entry); + + assertNotNull(timingWheel.overflowWheel(), "Overflow wheel should be created"); + + // Advancing clock should also advance overflow wheel clock + long advanceTime = startMs + tickMs * wheelSize + 10; // 1060ms + timingWheel.advanceClock(advanceTime); + + // Verify both wheels advanced + assertEquals(advanceTime, timingWheel.currentTimeMs(), "Main wheel clock should advance"); + assertEquals(startMs + tickMs * wheelSize, timingWheel.overflowWheel().currentTimeMs(), "Overflow wheel clock should also advance"); + } + + private static class TestTimerTask extends TimerTask { + + TestTimerTask(long delayMs) { + super(delayMs); + } + + @Override + public void run() { + // No-op + } + } +} \ No newline at end of file