This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 04e3acb546c MINOR: Fix flaky test in ReplicaManagerTest (#21244)
04e3acb546c is described below

commit 04e3acb546c866c6084020bcb7a72390b66344e5
Author: Parker Chang <[email protected]>
AuthorDate: Wed Jan 7 03:15:35 2026 +0800

    MINOR: Fix flaky test in ReplicaManagerTest (#21244)
    
    Refer to
    https://github.com/apache/kafka/pull/20082#discussion_r2639707507.
    
    Refactored the test to fix a race condition caused by dynamic Mockito
    stubbing during test execution.
    
    The previous implementation used `doReturn(false)` and `reset()` on a
    spy object while a background thread was running, causing a
    `ClassCastException`.
    
    This patch replaces that logic with a thread-safe `AtomicBoolean` and
    `doAnswer` approach to toggle the mock's behavior safely.
    
    ## Test Command
    ```
    N=100; I=0; while [ $I -lt $N ] && ./gradlew cleanTest core:test --tests
    ReplicaManagerTest -PmaxParallelForks=4 \
    ; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
    ```
    
    ## Test Result
    ```
    BUILD SUCCESSFUL in 12s
    151 actionable tasks: 2 executed, 149 up-to-date
    Consider enabling configuration cache to speed up this build:
    https://docs.gradle.org/9.2.1/userguide/configuration_cache_enabling.html
    Completed run: 100
    ```
    
    Reviewers: Gaurav Narula <[email protected]>, Chia-Ping Tsai
     <[email protected]>, PoAn Yang <[email protected]>
---
 .../scala/unit/kafka/server/ReplicaManagerTest.scala   | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e9d6518364b..556c4b4dcf3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -94,7 +94,7 @@ import java.io.{ByteArrayInputStream, File}
 import java.net.InetAddress
 import java.nio.file.{Files, Paths}
 import java.util
-import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
 import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, 
CountDownLatch, Future, TimeUnit}
 import java.util.function.{BiConsumer, Consumer}
 import java.util.stream.IntStream
@@ -5514,6 +5514,17 @@ class ReplicaManagerTest {
 
     try {
       val spiedPartition = spy(Partition(tpId, time, replicaManager))
+
+      // Prevent promotion of future replica
+      val blockPromotion = new AtomicBoolean(true)
+      doAnswer { invocation =>
+        if (blockPromotion.compareAndSet(true, false)) {
+          false
+        } else {
+          invocation.callRealMethod()
+        }
+      }.when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
+
       replicaManager.addOnlinePartition(tp, spiedPartition)
 
       val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
partitions = List(0, 1), List.empty, topic, topicIds(topic))
@@ -5526,9 +5537,6 @@ class ReplicaManagerTest {
       val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ 
== firstLogDir).head
       replicaManager.alterReplicaLogDirs(Map(tp -> 
newReplicaFolder.getAbsolutePath))
 
-      // Prevent promotion of future replica
-      
doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
-
       // Make sure the future log is created with the correct topic ID.
       val futureLog = replicaManager.futureLocalLogOrException(tp)
       assertEquals(Optional.of(topicId), futureLog.topicId)
@@ -5537,8 +5545,6 @@ class ReplicaManagerTest {
       val finalReplicaFolder = 
replicaManager.logManager.liveLogDirs.filterNot(it => it == firstLogDir || it 
== newReplicaFolder).head
       replicaManager.alterReplicaLogDirs(Map(tp -> 
finalReplicaFolder.getAbsolutePath))
 
-      reset(spiedPartition)
-
       TestUtils.waitUntilTrue(() => {
         replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
         replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty

Reply via email to