This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5562-39a12345a50292c3b047b7a44f8848a7c7102d8a in repository https://gitbox.apache.org/repos/asf/texera.git
commit 80542aaaab476b675b10dbd54787c75982913b91 Author: Kunwoo (Chris) <[email protected]> AuthorDate: Thu Jun 11 22:17:54 2026 -0700 test(amber): fix ConcurrentModificationException flake in RegionExecutionCoordinatorSpec (#5562) ### What changes were proposed in this PR? `RegionExecutionCoordinatorSpec`'s *"retry EndWorker failures…"* test polled the `ControllerRpcProbe.calls` buffer from the test thread (`waitUntil(endWorkerCalls.size >= 2)`) while the coordinator's 200 ms `EndWorker` retry appended to it from the kill-retry timer thread. That read racing an append tripped Scala 2.13's `MutationTracker` and surfaced as a non-deterministic `java.util.ConcurrentModificationException`. The `calls` buffer is test-only — production has no such buffer and never reads it — so the race is a property of the test, not the source. Rather than make the test helper thread-safe, this fixes the test: it waits on a `CountDownLatch` (counted down from the probe callback once the retry's `EndWorker` is recorded) instead of polling, so the test thread never iterates the buffer while the timer thread appends. The real timer-thread retry still runs, so the production path is exercised faithfully — the accesses are just ordered (append → latch → read) instead of overlapping. No production code is changed; `ControllerRpcProbe` keeps its plain `ArrayBuffer`. ### Any related issues, documentation, discussions? Resolves #5546 ### How was this PR tested? `RegionExecutionCoordinatorSpec` + `WorkflowExecutionCoordinatorSpec` → 10/10 pass. The retry test is race-free by construction: its only reads of the call buffer happen after the latch `await` returns — i.e. after the timer thread has finished appending — so no read can overlap an append. ``` sbt 'WorkflowExecutionService/testOnly org.apache.texera.amber.engine.architecture.scheduling.RegionExecutionCoordinatorSpec' ``` ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Anthropic Claude Opus 4.8) --- .../scheduling/RegionExecutionCoordinatorSpec.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala index 6efbe5e4ca..9e6cb227e5 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala @@ -35,6 +35,7 @@ import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpecLike +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic /** @@ -84,11 +85,17 @@ class RegionExecutionCoordinatorSpec it should "retry EndWorker failures and delay gracefulStop until a retry succeeds" in { val attempts = new atomic.AtomicInteger(0) + // The first EndWorker is sent on the test thread; the retry is sent later from the coordinator's + // kill-retry timer thread. Block on this latch — counted down from the probe callback once the + // retry's call has been recorded — instead of polling `endWorkerCalls` from the test thread, so + // the test never iterates the call buffer while the timer thread is appending to it. + val retryAttempted = new CountDownLatch(1) val fixture = createSingleRegionFixture(endWorkerResponse = _ => if (attempts.incrementAndGet() == 1) { Some(transientEndWorkerFailure) } else { + retryAttempted.countDown() None } ) @@ -96,7 +103,10 @@ class RegionExecutionCoordinatorSpec launchRegion(fixture.coordinator) val completion = requestRegionCompletion(fixture.coordinator) - waitUntil(fixture.rpcProbe.endWorkerCalls.size >= 2) + assert( + retryAttempted.await(testTimeout.inMilliseconds, TimeUnit.MILLISECONDS), + "EndWorker was not retried within the deadline" + ) assert(completion.poll.isEmpty) assert(!fixture.coordinator.isCompleted) assert(fixture.actorRefService.hasActorRef(fixture.workerId))
