This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0076b65f994 KAFKA-19182 Move SchedulerTest to server module (#19608)
0076b65f994 is described below

commit 0076b65f9941ef345db06b4153b63bb4d36f2fea
Author: Uladzislau Blok <123193120+uladzislaub...@users.noreply.github.com>
AuthorDate: Thu May 8 18:02:38 2025 +0200

    KAFKA-19182 Move SchedulerTest to server module (#19608)
    
    This PR moves SchedulerTest to server module and rewrite it with java.
    
    Please also check updated import control config!
    
    Reviewers: Ken Huang <s7133...@gmail.com>, Chia-Ping Tsai
     <chia7...@gmail.com>
---
 build.gradle                                       |   1 +
 checkstyle/import-control-server.xml               |   4 +
 .../scala/unit/kafka/utils/SchedulerTest.scala     | 234 ------------------
 .../apache/kafka/server/util/SchedulerTest.java    | 266 +++++++++++++++++++++
 4 files changed, 271 insertions(+), 234 deletions(-)

diff --git a/build.gradle b/build.gradle
index a26e02cbffe..d2ac9aa1dc4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -911,6 +911,7 @@ project(':server') {
     testImplementation project(':test-common:test-common-internal-api')
     testImplementation project(':test-common:test-common-runtime')
     testImplementation project(':storage:storage-api').sourceSets.test.output
+    testImplementation project(':server-common').sourceSets.test.output
 
     testRuntimeOnly runtimeTestLibs
   }
diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index 30a7f5fbe76..2f96777351a 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -100,6 +100,10 @@
     <subpackage name="share">
       <allow pkg="org.apache.kafka.storage.log.metrics" />
     </subpackage>
+    <subpackage name="util">
+      <allow pkg="org.apache.kafka.storage.log.metrics" />
+      <allow pkg="org.apache.kafka.storage.internals.epoch" />
+    </subpackage>
   </subpackage>
 
   <subpackage name="security">
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala 
b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
deleted file mode 100644
index 8518a469c7d..00000000000
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.utils
-
-import java.util.{Optional, Properties}
-import java.util.concurrent.atomic._
-import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors, 
TimeUnit}
-import kafka.utils.TestUtils.retry
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{LocalLog, LogConfig, 
LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegments, 
ProducerStateManager, ProducerStateManagerConfig, UnifiedLog}
-import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
-
-
-class SchedulerTest {
-
-  val scheduler = new KafkaScheduler(1)
-  val mockTime = new MockTime
-  val counter1 = new AtomicInteger(0)
-  val counter2 = new AtomicInteger(0)
-
-  @BeforeEach
-  def setup(): Unit = {
-    scheduler.startup()
-  }
-
-  @AfterEach
-  def teardown(): Unit = {
-    scheduler.shutdown()
-  }
-
-  @Test
-  def testMockSchedulerNonPeriodicTask(): Unit = {
-    mockTime.scheduler.scheduleOnce("test1", () => counter1.getAndIncrement(), 
1)
-    mockTime.scheduler.scheduleOnce("test2", () => counter2.getAndIncrement(), 
100)
-    assertEquals(0, counter1.get, "Counter1 should not be incremented prior to 
task running.")
-    assertEquals(0, counter2.get, "Counter2 should not be incremented prior to 
task running.")
-    mockTime.sleep(1)
-    assertEquals(1, counter1.get, "Counter1 should be incremented")
-    assertEquals(0, counter2.get, "Counter2 should not be incremented")
-    mockTime.sleep(100000)
-    assertEquals(1, counter1.get, "More sleeping should not result in more 
incrementing on counter1.")
-    assertEquals(1, counter2.get, "Counter2 should now be incremented.")
-  }
-
-  @Test
-  def testMockSchedulerPeriodicTask(): Unit = {
-    mockTime.scheduler.schedule("test1", () => counter1.getAndIncrement(), 1, 
1)
-    mockTime.scheduler.schedule("test2", () => counter2.getAndIncrement(), 
100, 100)
-    assertEquals(0, counter1.get, "Counter1 should not be incremented prior to 
task running.")
-    assertEquals(0, counter2.get, "Counter2 should not be incremented prior to 
task running.")
-    mockTime.sleep(1)
-    assertEquals(1, counter1.get, "Counter1 should be incremented")
-    assertEquals(0, counter2.get, "Counter2 should not be incremented")
-    mockTime.sleep(100)
-    assertEquals(101, counter1.get, "Counter1 should be incremented 101 times")
-    assertEquals(1, counter2.get, "Counter2 should not be incremented once")
-  }
-
-  @Test
-  def testReentrantTaskInMockScheduler(): Unit = {
-    mockTime.scheduler.scheduleOnce("test1", () => 
mockTime.scheduler.scheduleOnce("test2", () => counter2.getAndIncrement(), 0), 
1)
-    mockTime.sleep(1)
-    assertEquals(1, counter2.get)
-  }
-
-  @Test
-  def testNonPeriodicTask(): Unit = {
-    scheduler.scheduleOnce("test", () => counter1.getAndIncrement())
-    retry(30000) {
-      assertEquals(counter1.get, 1)
-    }
-    Thread.sleep(5)
-    assertEquals(1, counter1.get, "Should only run once")
-  }
-
-  @Test
-  def testNonPeriodicTaskWhenPeriodIsZero(): Unit = {
-    scheduler.schedule("test", () => counter1.getAndIncrement(), 0, 0)
-    retry(30000) {
-      assertEquals(counter1.get, 1)
-    }
-    Thread.sleep(5)
-    assertEquals(1, counter1.get, "Should only run once")
-  }
-
-  @Test
-  def testPeriodicTask(): Unit = {
-    scheduler.schedule("test", () => counter1.getAndIncrement(), 0, 5)
-    retry(30000) {
-      assertTrue(counter1.get >= 20, "Should count to 20")
-    }
-  }
-
-  @Test
-  def testRestart(): Unit = {
-    // schedule a task to increment a counter
-    mockTime.scheduler.scheduleOnce("test1", () => counter1.getAndIncrement(), 
1)
-    mockTime.sleep(1)
-    assertEquals(1, counter1.get())
-
-    // restart the scheduler
-    mockTime.scheduler.shutdown()
-    mockTime.scheduler.startup()
-
-    // schedule another task to increment the counter
-    mockTime.scheduler.scheduleOnce("test1", () => counter1.getAndIncrement(), 
1)
-    mockTime.sleep(1)
-    assertEquals(2, counter1.get())
-  }
-
-  @Test
-  def testUnscheduleProducerTask(): Unit = {
-    val tmpDir = TestUtils.tempDir()
-    val logDir = TestUtils.randomPartitionLogDir(tmpDir)
-    val logConfig = new LogConfig(new Properties())
-    val brokerTopicStats = new BrokerTopicStats
-    val maxTransactionTimeoutMs = 5 * 60 * 1000
-    val maxProducerIdExpirationMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT
-    val producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
-    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
-    val logDirFailureChannel = new LogDirFailureChannel(10)
-    val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-      logDir, topicPartition, logDirFailureChannel, Optional.empty, 
mockTime.scheduler)
-    val producerStateManagerConfig = new 
ProducerStateManagerConfig(maxProducerIdExpirationMs, false)
-    val producerStateManager = new ProducerStateManager(topicPartition, logDir,
-      maxTransactionTimeoutMs, producerStateManagerConfig, mockTime)
-    val offsets = new LogLoader(
-      logDir,
-      topicPartition,
-      logConfig,
-      scheduler,
-      mockTime,
-      logDirFailureChannel,
-      true,
-      segments,
-      0L,
-      0L,
-      leaderEpochCache,
-      producerStateManager,
-      new ConcurrentHashMap[String, Integer],
-      false
-    ).load()
-    val localLog = new LocalLog(logDir, logConfig, segments, 
offsets.recoveryPoint,
-      offsets.nextOffsetMetadata, scheduler, mockTime, topicPartition, 
logDirFailureChannel)
-    val log = new UnifiedLog(offsets.logStartOffset,
-      localLog,
-      brokerTopicStats,
-      producerIdExpirationCheckIntervalMs,
-      leaderEpochCache,
-      producerStateManager,
-      Optional.empty,
-      false,
-      LogOffsetsListener.NO_OP_OFFSETS_LISTENER)
-    assertTrue(scheduler.taskRunning(log.producerExpireCheck))
-    log.close()
-    assertFalse(scheduler.taskRunning(log.producerExpireCheck))
-  }
-
-  /**
-   * Verify that scheduler lock is not held when invoking task method, 
allowing new tasks to be scheduled
-   * when another is being executed. This is required to avoid deadlocks when:
-   *   a) Thread1 executes a task which attempts to acquire LockA
-   *   b) Thread2 holding LockA attempts to schedule a new task
-   */
-  @Timeout(15)
-  @Test
-  def testMockSchedulerLocking(): Unit = {
-    val initLatch = new CountDownLatch(1)
-    val completionLatch = new CountDownLatch(2)
-    val taskLatches = List(new CountDownLatch(1), new CountDownLatch(1))
-    def scheduledTask(taskLatch: CountDownLatch): Unit = {
-      initLatch.countDown()
-      assertTrue(taskLatch.await(30, TimeUnit.SECONDS), "Timed out waiting for 
latch")
-      completionLatch.countDown()
-    }
-    mockTime.scheduler.scheduleOnce("test1", () => 
scheduledTask(taskLatches.head), 1)
-    val tickExecutor = Executors.newSingleThreadScheduledExecutor()
-    try {
-      tickExecutor.scheduleWithFixedDelay(() => mockTime.sleep(1), 0, 1, 
TimeUnit.MILLISECONDS)
-
-      // wait for first task to execute and then schedule the next task while 
the first one is running
-      assertTrue(initLatch.await(10, TimeUnit.SECONDS))
-      mockTime.scheduler.scheduleOnce("test2", () => 
scheduledTask(taskLatches(1)), 1)
-
-      taskLatches.foreach(_.countDown())
-      assertTrue(completionLatch.await(10, TimeUnit.SECONDS), "Tasks did not 
complete")
-
-    } finally {
-      tickExecutor.shutdownNow()
-    }
-  }
-
-  @Test
-  def testPendingTaskSize(): Unit = {
-    val latch1 = new CountDownLatch(1)
-    val latch2 = new CountDownLatch(2)
-    val task1 = new Runnable {
-      override def run(): Unit = {
-        latch1.await()
-      }
-    }
-    scheduler.scheduleOnce("task1", task1, 0)
-    scheduler.scheduleOnce("task2", () => latch2.countDown(), 5)
-    scheduler.scheduleOnce("task3", () => latch2.countDown(), 5)
-    retry(30000) {
-      assertEquals(2, scheduler.pendingTaskSize())
-    }
-    latch1.countDown()
-    latch2.await()
-    retry(30000) {
-      assertEquals(0, scheduler.pendingTaskSize())
-    }
-    scheduler.shutdown()
-    assertEquals(0, scheduler.pendingTaskSize())
-  }
-}
diff --git 
a/server/src/test/java/org/apache/kafka/server/util/SchedulerTest.java 
b/server/src/test/java/org/apache/kafka/server/util/SchedulerTest.java
new file mode 100644
index 00000000000..933a14b7766
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/util/SchedulerTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.LoadedLogOffsets;
+import org.apache.kafka.storage.internals.log.LocalLog;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.storage.internals.log.LogLoader;
+import org.apache.kafka.storage.internals.log.LogOffsetsListener;
+import org.apache.kafka.storage.internals.log.LogSegments;
+import org.apache.kafka.storage.internals.log.ProducerStateManager;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SchedulerTest {
+
+    private final KafkaScheduler scheduler = new KafkaScheduler(1);
+    private final MockTime mockTime = new MockTime();
+    private final AtomicInteger counter1 = new AtomicInteger(0);
+    private final AtomicInteger counter2 = new AtomicInteger(0);
+
+    @BeforeEach
+    void setup() {
+        counter1.set(0);
+        counter2.set(0);
+        scheduler.startup();
+    }
+
+    @AfterEach
+    void teardown() throws InterruptedException {
+        scheduler.shutdown();
+    }
+
+    @Test
+    void testMockSchedulerNonPeriodicTask() {
+        mockTime.scheduler.scheduleOnce("test1", counter1::getAndIncrement, 1);
+        mockTime.scheduler.scheduleOnce("test2", counter2::getAndIncrement, 
100);
+        assertEquals(0, counter1.get(), "Counter1 should not be incremented 
prior to task running.");
+        assertEquals(0, counter2.get(), "Counter2 should not be incremented 
prior to task running.");
+        mockTime.sleep(1);
+        assertEquals(1, counter1.get(), "Counter1 should be incremented");
+        assertEquals(0, counter2.get(), "Counter2 should not be incremented");
+        mockTime.sleep(100000);
+        assertEquals(1, counter1.get(), "More sleeping should not result in 
more incrementing on counter1.");
+        assertEquals(1, counter2.get(), "Counter2 should now be incremented.");
+    }
+
+    @Test
+    void testMockSchedulerPeriodicTask() {
+        mockTime.scheduler.schedule("test1", counter1::getAndIncrement, 1, 1);
+        mockTime.scheduler.schedule("test2", counter2::getAndIncrement, 100, 
100);
+        assertEquals(0, counter1.get(), "Counter1 should not be incremented 
prior to task running.");
+        assertEquals(0, counter2.get(), "Counter2 should not be incremented 
prior to task running.");
+        mockTime.sleep(1);
+        assertEquals(1, counter1.get(), "Counter1 should be incremented");
+        assertEquals(0, counter2.get(), "Counter2 should not be incremented");
+        mockTime.sleep(100);
+        assertEquals(101, counter1.get(), "Counter1 should be incremented 101 
times");
+        assertEquals(1, counter2.get(), "Counter2 should not be incremented 
once");
+    }
+
+    @Test
+    void testReentrantTaskInMockScheduler() {
+        mockTime.scheduler.scheduleOnce("test1", () -> 
mockTime.scheduler.scheduleOnce("test2", counter2::getAndIncrement, 0), 1);
+        mockTime.sleep(1);
+        assertEquals(1, counter2.get());
+    }
+
+    @Test
+    void testNonPeriodicTask() throws InterruptedException {
+        scheduler.scheduleOnce("test", counter1::getAndIncrement);
+        TestUtils.waitForCondition(() -> counter1.get() == 1, "Scheduled task 
was not executed");
+        Thread.sleep(5);
+        assertEquals(1, counter1.get(), "Should only run once");
+    }
+
+    @Test
+    void testNonPeriodicTaskWhenPeriodIsZero() throws InterruptedException {
+        scheduler.schedule("test", counter1::getAndIncrement, 0, 0);
+        TestUtils.waitForCondition(() -> counter1.get() == 1, "Scheduled task 
was not executed");
+        Thread.sleep(5);
+        assertEquals(1, counter1.get(), "Should only run once");
+    }
+
+    @Test
+    void testPeriodicTask() throws InterruptedException {
+        scheduler.schedule("test", counter1::getAndIncrement, 0, 5);
+        TestUtils.waitForCondition(() -> counter1.get() >= 20, "Should count 
to 20");
+    }
+
+    @Test
+    void testRestart() throws InterruptedException {
+        // schedule a task to increment a counter
+        mockTime.scheduler.scheduleOnce("test1", counter1::getAndIncrement, 1);
+        mockTime.sleep(1);
+        assertEquals(1, counter1.get());
+
+        // restart the scheduler
+        mockTime.scheduler.shutdown();
+        mockTime.scheduler.startup();
+
+        // schedule another task to increment the counter
+        mockTime.scheduler.scheduleOnce("test1", counter1::getAndIncrement, 1);
+        mockTime.sleep(1);
+        assertEquals(2, counter1.get());
+    }
+
+    @Test
+    void testUnscheduleProducerTask() throws IOException {
+        File tmpDir = TestUtils.tempDirectory();
+        File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+        LogConfig logConfig = new LogConfig(new Properties());
+        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+        int maxTransactionTimeoutMs = 5 * 60 * 1000;
+        int maxProducerIdExpirationMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT;
+        int producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT;
+        TopicPartition topicPartition = 
UnifiedLog.parseTopicPartitionName(logDir);
+        LogDirFailureChannel logDirFailureChannel = new 
LogDirFailureChannel(10);
+        LogSegments segments = new LogSegments(topicPartition);
+        LeaderEpochFileCache leaderEpochCache = 
UnifiedLog.createLeaderEpochCache(logDir, topicPartition,
+                logDirFailureChannel, Optional.empty(), mockTime.scheduler);
+        ProducerStateManagerConfig producerStateManagerConfig = new 
ProducerStateManagerConfig(maxProducerIdExpirationMs, false);
+        ProducerStateManager producerStateManager = new 
ProducerStateManager(topicPartition, logDir,
+                maxTransactionTimeoutMs, producerStateManagerConfig, mockTime);
+        LoadedLogOffsets offsets = new LogLoader(
+                logDir,
+                topicPartition,
+                logConfig,
+                scheduler,
+                mockTime,
+                logDirFailureChannel,
+                true,
+                segments,
+                0L,
+                0L,
+                leaderEpochCache,
+                producerStateManager,
+                new ConcurrentHashMap<>(), false).load();
+        LocalLog localLog = new LocalLog(logDir, logConfig, segments, 
offsets.recoveryPoint,
+                offsets.nextOffsetMetadata, scheduler, mockTime, 
topicPartition, logDirFailureChannel);
+        UnifiedLog log = new UnifiedLog(offsets.logStartOffset,
+                localLog,
+                brokerTopicStats,
+                producerIdExpirationCheckIntervalMs,
+                leaderEpochCache,
+                producerStateManager,
+                Optional.empty(),
+                false,
+                LogOffsetsListener.NO_OP_OFFSETS_LISTENER);
+        assertTrue(scheduler.taskRunning(log.producerExpireCheck()));
+        log.close();
+        assertFalse(scheduler.taskRunning(log.producerExpireCheck()));
+    }
+
+    /**
+     * Verify that scheduler lock is not held when invoking task method, 
allowing new tasks to be scheduled
+     * when another is being executed. This is required to avoid deadlocks 
when:
+     * <ul>
+     *     <li>Thread1 executes a task which attempts to acquire LockA</li>
+     *     <li>Thread2 holding LockA attempts to schedule a new task</li>
+     * </ul>
+     */
+    @Timeout(15)
+    @Test
+    void testMockSchedulerLocking() throws InterruptedException {
+        CountDownLatch initLatch = new CountDownLatch(1);
+        CountDownLatch completionLatch = new CountDownLatch(2);
+        List<CountDownLatch> taskLatches = List.of(new CountDownLatch(1), new 
CountDownLatch(1));
+        InterruptedConsumer<CountDownLatch> scheduledTask = taskLatch -> {
+            initLatch.countDown();
+            assertTrue(taskLatch.await(30, TimeUnit.SECONDS), "Timed out 
waiting for latch");
+            completionLatch.countDown();
+        };
+        mockTime.scheduler.scheduleOnce("test1", interruptedRunnableWrapper(() 
-> scheduledTask.accept(taskLatches.get(0))), 1);
+        ScheduledExecutorService tickExecutor = 
Executors.newSingleThreadScheduledExecutor();
+        try {
+            tickExecutor.scheduleWithFixedDelay(() -> mockTime.sleep(1), 0, 1, 
TimeUnit.MILLISECONDS);
+
+            // wait for first task to execute and then schedule the next task 
while the first one is running
+            assertTrue(initLatch.await(10, TimeUnit.SECONDS));
+            mockTime.scheduler.scheduleOnce("test2", 
interruptedRunnableWrapper(() -> scheduledTask.accept(taskLatches.get(1))), 1);
+
+            taskLatches.forEach(CountDownLatch::countDown);
+            assertTrue(completionLatch.await(10, TimeUnit.SECONDS), "Tasks did 
not complete");
+        } finally {
+            tickExecutor.shutdownNow();
+        }
+    }
+
+    @Test
+    void testPendingTaskSize() throws InterruptedException {
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(2);
+        scheduler.scheduleOnce("task1", 
interruptedRunnableWrapper(latch1::await), 0);
+        scheduler.scheduleOnce("task2", latch2::countDown, 5);
+        scheduler.scheduleOnce("task3", latch2::countDown, 5);
+        TestUtils.waitForCondition(() -> scheduler.pendingTaskSize() <= 2, 
"Scheduled task was not executed");
+        latch1.countDown();
+        latch2.await();
+        TestUtils.waitForCondition(() -> scheduler.pendingTaskSize() == 0, 
"Scheduled task was not executed");
+        scheduler.shutdown();
+        assertEquals(0, scheduler.pendingTaskSize());
+    }
+
+    @FunctionalInterface
+    private interface InterruptedConsumer<T> {
+        void accept(T t) throws InterruptedException;
+    }
+
+    @FunctionalInterface
+    private interface InterruptedRunnable {
+        void run() throws InterruptedException;
+    }
+
+    private static Runnable interruptedRunnableWrapper(InterruptedRunnable 
runnable) {
+        return () -> {
+            try {
+                runnable.run();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        };
+    }
+}

Reply via email to