This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 04e3acb546c MINOR: Fix flaky test in ReplicaManagerTest (#21244)
04e3acb546c is described below
commit 04e3acb546c866c6084020bcb7a72390b66344e5
Author: Parker Chang <[email protected]>
AuthorDate: Wed Jan 7 03:15:35 2026 +0800
MINOR: Fix flaky test in ReplicaManagerTest (#21244)
Refer to
https://github.com/apache/kafka/pull/20082#discussion_r2639707507.
Refactored the test to fix a race condition caused by dynamic Mockito
stubbing during test execution.
The previous implementation used `doReturn(false)` and `reset()` on a
spy object while a background thread was running, causing a
`ClassCastException`.
This patch replaces that logic with a thread-safe `AtomicBoolean` and
`doAnswer` approach to toggle the mock's behavior safely.
## Test Command
```
N=100; I=0; while [ $I -lt $N ] && ./gradlew cleanTest core:test --tests
ReplicaManagerTest -PmaxParallelForks=4 \
; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
```
## Test Result
```
BUILD SUCCESSFUL in 12s
151 actionable tasks: 2 executed, 149 up-to-date
Consider enabling configuration cache to speed up this build:
https://docs.gradle.org/9.2.1/userguide/configuration_cache_enabling.html
Completed run: 100
```
Reviewers: Gaurav Narula <[email protected]>, Chia-Ping Tsai
<[email protected]>, PoAn Yang <[email protected]>
---
.../scala/unit/kafka/server/ReplicaManagerTest.scala | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e9d6518364b..556c4b4dcf3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -94,7 +94,7 @@ import java.io.{ByteArrayInputStream, File}
import java.net.InetAddress
import java.nio.file.{Files, Paths}
import java.util
-import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap,
CountDownLatch, Future, TimeUnit}
import java.util.function.{BiConsumer, Consumer}
import java.util.stream.IntStream
@@ -5514,6 +5514,17 @@ class ReplicaManagerTest {
try {
val spiedPartition = spy(Partition(tpId, time, replicaManager))
+
+ // Prevent promotion of future replica
+ val blockPromotion = new AtomicBoolean(true)
+ doAnswer { invocation =>
+ if (blockPromotion.compareAndSet(true, false)) {
+ false
+ } else {
+ invocation.callRealMethod()
+ }
+ }.when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
+
replicaManager.addOnlinePartition(tp, spiedPartition)
val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
partitions = List(0, 1), List.empty, topic, topicIds(topic))
@@ -5526,9 +5537,6 @@ class ReplicaManagerTest {
val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_
== firstLogDir).head
replicaManager.alterReplicaLogDirs(Map(tp ->
newReplicaFolder.getAbsolutePath))
- // Prevent promotion of future replica
-
doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
-
// Make sure the future log is created with the correct topic ID.
val futureLog = replicaManager.futureLocalLogOrException(tp)
assertEquals(Optional.of(topicId), futureLog.topicId)
@@ -5537,8 +5545,6 @@ class ReplicaManagerTest {
val finalReplicaFolder =
replicaManager.logManager.liveLogDirs.filterNot(it => it == firstLogDir || it
== newReplicaFolder).head
replicaManager.alterReplicaLogDirs(Map(tp ->
finalReplicaFolder.getAbsolutePath))
- reset(spiedPartition)
-
TestUtils.waitUntilTrue(() => {
replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty