This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.2 by this push:
new 1ea384fd85 test(amber): fix ConcurrentModificationException flake in
RegionExecutionCoordinatorSpec (#5562)
1ea384fd85 is described below
commit 1ea384fd85f31d390c58b879548dbc767c0a0ccc
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Fri Jun 12 05:30:10 2026 +0000
test(amber): fix ConcurrentModificationException flake in
RegionExecutionCoordinatorSpec (#5562)
### What changes were proposed in this PR?
`RegionExecutionCoordinatorSpec`'s *"retry EndWorker failures…"* test
polled the `ControllerRpcProbe.calls` buffer from the test thread
(`waitUntil(endWorkerCalls.size >= 2)`) while the coordinator's 200 ms
`EndWorker` retry appended to it from the kill-retry timer thread. That
read racing an append tripped Scala 2.13's `MutationTracker` and
surfaced as a non-deterministic
`java.util.ConcurrentModificationException`.
The `calls` buffer is test-only — production has no such buffer and
never reads it — so the race is a property of the test, not the source.
Rather than make the test helper thread-safe, this fixes the test: it
waits on a `CountDownLatch` (counted down from the probe callback once
the retry's `EndWorker` is recorded) instead of polling, so the test
thread never iterates the buffer while the timer thread appends. The
real timer-thread retry still runs, so the production path is exercised
faithfully — the accesses are just ordered (append → latch → read)
instead of overlapping. No production code is changed;
`ControllerRpcProbe` keeps its plain `ArrayBuffer`.
### Any related issues, documentation, discussions?
Resolves #5546
### How was this PR tested?
`RegionExecutionCoordinatorSpec` + `WorkflowExecutionCoordinatorSpec` →
10/10 pass. The retry test is race-free by construction: its only reads
of the call buffer happen after the latch `await` returns — i.e. after
the timer thread has finished appending — so no read can overlap an
append.
```
sbt 'WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.scheduling.RegionExecutionCoordinatorSpec'
```
### Was this PR authored or co-authored using generative AI tooling?
(backported from commit 80542aaaab476b675b10dbd54787c75982913b91)
Generated-by: Claude Code (Anthropic Claude Opus 4.8)
---
.../scheduling/RegionExecutionCoordinatorSpec.scala | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
index 6efbe5e4ca..9e6cb227e5 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
@@ -35,6 +35,7 @@ import
org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpecLike
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic
/**
@@ -84,11 +85,17 @@ class RegionExecutionCoordinatorSpec
it should "retry EndWorker failures and delay gracefulStop until a retry
succeeds" in {
val attempts = new atomic.AtomicInteger(0)
+ // The first EndWorker is sent on the test thread; the retry is sent later
from the coordinator's
+ // kill-retry timer thread. Block on this latch — counted down from the
probe callback once the
+ // retry's call has been recorded — instead of polling `endWorkerCalls`
from the test thread, so
+ // the test never iterates the call buffer while the timer thread is
appending to it.
+ val retryAttempted = new CountDownLatch(1)
val fixture = createSingleRegionFixture(endWorkerResponse =
_ =>
if (attempts.incrementAndGet() == 1) {
Some(transientEndWorkerFailure)
} else {
+ retryAttempted.countDown()
None
}
)
@@ -96,7 +103,10 @@ class RegionExecutionCoordinatorSpec
launchRegion(fixture.coordinator)
val completion = requestRegionCompletion(fixture.coordinator)
- waitUntil(fixture.rpcProbe.endWorkerCalls.size >= 2)
+ assert(
+ retryAttempted.await(testTimeout.inMilliseconds, TimeUnit.MILLISECONDS),
+ "EndWorker was not retried within the deadline"
+ )
assert(completion.poll.isEmpty)
assert(!fixture.coordinator.isCompleted)
assert(fixture.actorRefService.hasActorRef(fixture.workerId))