This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 9eb33cef0b fix(amber): make region kill synchronous before scheduling
next region (#4557)
9eb33cef0b is described below
commit 9eb33cef0b11aa6a1d6e2e107293bf95cd0c60c2
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Thu Apr 30 16:13:19 2026 -0700
fix(amber): make region kill synchronous before scheduling next region
(#4557)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
This PR makes region termination synchronous with respect to region
scheduling. Previously, the workflow coordinator could schedule the next
region before the previous region's workers were fully terminated.
The main changes are:
- Keep `RegionExecutionCoordinator` in a non-completed phase while
worker termination is still in progress.
- Store and reuse a single termination future per region so repeated
coordination calls do not start duplicate kill attempts.
- Wait for region termination futures in `WorkflowExecutionCoordinator`
before scheduling the next region.
- Send `gracefulStop` only after all workers successfully reply to
`endWorker`.
- Retry region termination every 200ms if `endWorker` fails because a
worker still has queued messages.
- Keep `EndHandler` strict: `endWorker` fails whenever the worker still
has any unprocessed message.
- Prevent premature workflow completion while there are pending
scheduled regions or unfinished region coordinators.
- Emit final workflow completion from the coordinator terminal point
when no next region exists.
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
Closes #4556
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
Manually tested; Added test cases for `RegionExecutionCoordinator`,
`WorkflowExecutionCoordinator `, and `EndHandler`.
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
Generated-by: OpenAI Codex
---------
Co-authored-by: Xinyuan Lin <[email protected]>
---
.../controller/WorkflowScheduler.scala | 2 +
.../WorkerExecutionCompletedHandler.scala | 6 +-
.../scheduling/RegionExecutionCoordinator.scala | 56 ++++-
.../scheduling/WorkflowExecutionCoordinator.scala | 45 ++--
.../worker/promisehandlers/EndHandler.scala | 2 +-
.../scheduling/RegionCoordinatorTestSupport.scala | 241 +++++++++++++++++++++
.../RegionExecutionCoordinatorSpec.scala | 202 +++++++++++++++++
.../WorkflowExecutionCoordinatorSpec.scala | 93 ++++++++
.../worker/promisehandlers/EndHandlerSpec.scala | 113 ++++++++++
9 files changed, 733 insertions(+), 27 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
index 9dcf3ad4bf..b1acb3c065 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -54,4 +54,6 @@ class WorkflowScheduler(
def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else
schedule.next()
+ def hasPendingRegions: Boolean = schedule != null && schedule.hasNext
+
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index d54a22f26b..c3b3ddb234 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -61,7 +61,11 @@ trait WorkerExecutionCompletedHandler {
.collect(Seq(statsRequest))
.flatMap(_ => {
// if entire workflow is completed, clean up
- if (cp.workflowExecution.isCompleted) {
+ val isWorkflowTerminal =
+ cp.workflowExecution.isCompleted &&
+ !cp.workflowScheduler.hasPendingRegions &&
+ !cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators
+ if (isWorkflowTerminal) {
// after query result come back: send completed event, cleanup ,and
kill workflow
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
cp.controllerTimerService.disableStatusUpdate()
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index db9c583a68..254c16bf34 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -20,7 +20,7 @@
package org.apache.texera.amber.engine.architecture.scheduling
import org.apache.pekko.pattern.gracefulStop
-import com.twitter.util.{Future, Return, Throw}
+import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer,
Return, Throw, Timer}
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
@@ -61,7 +61,7 @@ import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutions
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration => ScalaDuration}
/**
* The executor of a region.
@@ -110,10 +110,14 @@ class RegionExecutionCoordinator(
private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new
AtomicReference(
Unexecuted
)
+ private val terminationFutureRef: AtomicReference[Future[Unit]] = new
AtomicReference(null)
+ private val killRetryTimer: Timer = new JavaTimer(true)
+ private val killRetryDelay: TwitterDuration =
TwitterDuration.fromMilliseconds(200)
/**
* Sync the status of `RegionExecution` and transition this coordinator's
phase to `Completed` only when the
- * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the
ports of this region are completed.
+ * coordinator is currently in `ExecutingNonDependeePortsPhase`, all the
ports of this region are completed, and
+ * all workers in this region are terminated.
*
* Additionally, this method will also terminate all the workers of this
region:
*
@@ -136,12 +140,22 @@ class RegionExecutionCoordinator(
return Future.Unit
}
- // Set this coordinator's status to be completed so that subsequent
regions can be started by
- // WorkflowExecutionCoordinator.
- setPhase(Completed)
-
- // Terminate all the workers in this region.
- terminateWorkers(regionExecution)
+ val existingTerminationFuture = terminationFutureRef.get
+ if (existingTerminationFuture != null) {
+ existingTerminationFuture
+ } else {
+ val terminationFuture =
terminateWorkersWithRetry(regionExecution).flatMap { _ =>
+ // Set this coordinator's status to be completed so that subsequent
regions can be started by
+ // WorkflowExecutionCoordinator.
+ setPhase(Completed)
+ Future.Unit
+ }
+ if (terminationFutureRef.compareAndSet(null, terminationFuture)) {
+ terminationFuture
+ } else {
+ terminationFutureRef.get
+ }
+ }
}
private def terminateWorkers(regionExecution: RegionExecution) = {
@@ -172,7 +186,7 @@ class RegionExecutionCoordinator(
// controller does not reuse old control-message sequence
numbers for new workers.
asyncRPCClient.inputGateway.removeControlChannel(workerId)
asyncRPCClient.outputGateway.removeControlChannel(workerId)
- gracefulStop(actorRef, Duration(5,
TimeUnit.SECONDS)).asTwitter()
+ gracefulStop(actorRef, ScalaDuration(5,
TimeUnit.SECONDS)).asTwitter()
}
}.toSeq
@@ -196,8 +210,30 @@ class RegionExecutionCoordinator(
}
}
+ private def terminateWorkersWithRetry(
+ regionExecution: RegionExecution,
+ attempt: Int = 1
+ ): Future[Unit] = {
+ terminateWorkers(regionExecution).rescue {
+ case err =>
+ logger.warn(
+ s"Failed to terminate region ${region.id.id} on attempt $attempt.
Retrying in ${killRetryDelay.inMilliseconds} ms.",
+ err
+ )
+ Future
+ .sleep(killRetryDelay)(killRetryTimer)
+ .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt +
1))
+ }
+ }
+
def isCompleted: Boolean = currentPhaseRef.get == Completed
+ /**
+ * Returns the region termination future if termination has been initiated.
+ * This is only set by `tryCompleteRegionExecution()`.
+ */
+ def getTerminationFutureOpt: Option[Future[Unit]] =
Option(terminationFutureRef.get)
+
/**
* This will sync and transition the region execution phase from one to
another depending on its current phase:
*
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 5a9c84c701..4b639fc241 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -27,9 +27,11 @@ import org.apache.texera.amber.engine.architecture.common.{
AkkaActorService
}
import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
+import
org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate
import
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
class WorkflowExecutionCoordinator(
@@ -44,6 +46,7 @@ class WorkflowExecutionCoordinator(
private val regionExecutionCoordinators
: mutable.HashMap[RegionIdentity, RegionExecutionCoordinator] =
mutable.HashMap()
+ private val completionNotified: AtomicBoolean = new AtomicBoolean(false)
@transient var actorRefService: AkkaActorRefMappingService = _
@@ -59,18 +62,19 @@ class WorkflowExecutionCoordinator(
* After the syncs, if there are no running region(s), it will start new
regions (if available).
*/
def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit]
= {
- if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
- // As this method is invoked by the completion of each port in a region,
and regionExecutionCoordinator only
- // lanuches each phase asynchronously, we need to let each current
unfinished regionExecutionCoordinator
- // sync its status and proceed with next phases if needed.
- Future
- .collect({
- regionExecutionCoordinators.values
- .filter(!_.isCompleted)
- .map(_.syncStatusAndTransitionRegionExecutionPhase())
- .toSeq
- })
+ val unfinishedRegionCoordinators =
+ regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq
+
+ // Trigger sync for each unfinished region.
+
unfinishedRegionCoordinators.foreach(_.syncStatusAndTransitionRegionExecutionPhase())
+
+ // Wait only for region termination futures (kill path), then re-run
coordination.
+ val terminationFutures =
unfinishedRegionCoordinators.flatMap(_.getTerminationFutureOpt)
+ if (terminationFutures.nonEmpty) {
+ return Future
+ .collect(terminationFutures)
.unit
+ .flatMap(_ => coordinateRegionExecutors(actorService))
}
if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
@@ -79,10 +83,17 @@ class WorkflowExecutionCoordinator(
}
// All existing regions are completed. Start the next region (if any).
+ val nextRegions = getNextRegions()
+ if (nextRegions.isEmpty) {
+ if (workflowExecution.isCompleted &&
completionNotified.compareAndSet(false, true)) {
+
asyncRPCClient.sendToClient(ExecutionStateUpdate(workflowExecution.getState))
+ }
+ return Future.Unit
+ }
+
+ executedRegions.append(nextRegions)
Future
- .collect({
- val nextRegions = getNextRegions()
- executedRegions.append(nextRegions)
+ .collect(
nextRegions
.map(region => {
val isRestart = workflowExecution.hasRegionExecution(region.id)
@@ -104,7 +115,7 @@ class WorkflowExecutionCoordinator(
})
.map(_.syncStatusAndTransitionRegionExecutionPhase())
.toSeq
- })
+ )
.unit
}
@@ -122,4 +133,8 @@ class WorkflowExecutionCoordinator(
.toSet
}
+ def hasUnfinishedRegionCoordinators: Boolean = {
+ regionExecutionCoordinators.values.exists(!_.isCompleted)
+ }
+
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
index 2a6a20b3d3..0504e66f52 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
@@ -48,8 +48,8 @@ trait EndHandler {
s"Received EndHandler before all messages are processed. Unprocessed
messages: " +
s"${dp.inputManager.inputMessageQueue.peek()}"
)
+ return Future.exception(new IllegalStateException("worker still has
unprocessed messages"))
}
- assert(dp.inputManager.inputMessageQueue.isEmpty)
// Now we can safely acknowledge that this worker can be terminated.
EmptyReturn()
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
new file mode 100644
index 0000000000..facba10241
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling
+
+import com.twitter.util.{Await, Duration, Future}
+import org.apache.pekko.actor.{Actor, ActorRef, Props}
+import org.apache.pekko.testkit.{TestActorRef, TestKit}
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{
+ ActorVirtualIdentity,
+ ChannelIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity
+}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import org.apache.texera.amber.core.workflow.WorkflowContext.{
+ DEFAULT_EXECUTION_ID,
+ DEFAULT_WORKFLOW_ID
+}
+import org.apache.texera.amber.engine.architecture.common.{
+ AkkaActorRefMappingService,
+ AkkaActorService,
+ WorkflowActor
+}
+import
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+ NetworkInputGateway,
+ NetworkOutputGateway
+}
+import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.ControlInvocation
+import org.apache.texera.amber.engine.architecture.rpc.controlreturns._
+import org.apache.texera.amber.engine.architecture.scheduling.config.{
+ OperatorConfig,
+ ResourceConfig,
+ WorkerConfig
+}
+import
org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState
+import org.apache.texera.amber.engine.common.CheckpointState
+import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
+import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
+import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
+import org.apache.texera.amber.util.VirtualIdentityUtils
+
+import scala.collection.mutable
+
+object RegionCoordinatorTestSupport {
+ val InitializeExecutor = "initializeExecutor"
+ val OpenExecutor = "openExecutor"
+ val StartWorker = "startWorker"
+ val EndWorker = "endWorker"
+
+ // Generous deadline for the polling helpers below. Production timing under
test (notably the
+ // 200 ms `killRetryDelay` in `RegionExecutionCoordinator`) fits
comfortably; the rest is
+ // headroom for slow CI.
+ val testTimeout: Duration = Duration.fromSeconds(5)
+
+ case class WorkerRpcCall(
+ methodName: String,
+ receiver: ActorVirtualIdentity,
+ commandId: Long
+ )
+
+ case class ControllerHarnessFixture(
+ actorService: AkkaActorService,
+ actorRefService: AkkaActorRefMappingService
+ )
+
+ /**
+ * Captures controller-to-worker RPCs at the same boundary used by
production
+ * `AsyncRPCClient.workerInterface`.
+ *
+ * Non-termination RPCs are completed immediately because these tests focus
on termination
+ * ordering. `endWorker` responses are controlled by `endWorkerResponse`,
allowing each test to
+ * hold termination pending, fail an attempt, or allow it to succeed.
+ */
+ class ControllerRpcProbe(endWorkerResponse: WorkerRpcCall =>
Option[ControlReturn]) {
+ val calls: mutable.ArrayBuffer[WorkerRpcCall] = mutable.ArrayBuffer()
+ val inputGateway = new NetworkInputGateway(CONTROLLER)
+ val outputGateway = new NetworkOutputGateway(CONTROLLER, handleOutput)
+ val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway,
CONTROLLER)
+
+ def methodTrace: Seq[String] = calls.map(_.methodName).toSeq
+
+ def initializedWorkers: Seq[ActorVirtualIdentity] =
+ calls.filter(_.methodName == InitializeExecutor).map(_.receiver).toSeq
+
+ def startedWorkers: Seq[ActorVirtualIdentity] =
+ calls.filter(_.methodName == StartWorker).map(_.receiver).toSeq
+
+ def endWorkerCalls: Seq[WorkerRpcCall] =
+ calls.filter(_.methodName == EndWorker).toSeq
+
+ def onlyEndWorkerCall: WorkerRpcCall = {
+ assert(endWorkerCalls.size == 1)
+ endWorkerCalls.head
+ }
+
+ def fulfill(call: WorkerRpcCall, returnValue: ControlReturn): Unit = {
+ asyncRPCClient.fulfillPromise(ReturnInvocation(call.commandId,
returnValue))
+ }
+
+ private def handleOutput(message: WorkflowFIFOMessage): Unit = {
+ message.payload match {
+ case invocation: ControlInvocation =>
+ recordAndMaybeFulfill(invocation)
+ case _ =>
+ // Client events and stats updates are irrelevant to the coordinator
lifecycle assertions.
+ }
+ }
+
+ private def recordAndMaybeFulfill(invocation: ControlInvocation): Unit = {
+ val call = WorkerRpcCall(
+ methodName = invocation.methodName,
+ receiver = invocation.context.receiver,
+ commandId = invocation.commandId
+ )
+ calls += call
+ immediateReturn(call).foreach(fulfill(call, _))
+ }
+
+ private def immediateReturn(call: WorkerRpcCall): Option[ControlReturn] = {
+ call.methodName match {
+ case InitializeExecutor | OpenExecutor =>
+ Some(EmptyReturn())
+ case StartWorker =>
+ Some(WorkerStateResponse(WorkerState.RUNNING))
+ case EndWorker =>
+ endWorkerResponse(call)
+ case other =>
+ throw new AssertionError(s"Unexpected worker RPC in test: $other")
+ }
+ }
+ }
+
+ class IdleActor extends Actor {
+ override def receive: Receive = { case _ => () }
+ }
+
+ class ControllerHarness extends WorkflowActor(None, CONTROLLER) {
+ override def handleInputMessage(id: Long, workflowMsg:
WorkflowFIFOMessage): Unit = ()
+
+ override def getQueuedCredit(channelId: ChannelIdentity): Long = 0
+
+ override def handleBackpressure(isBackpressured: Boolean): Unit = ()
+
+ override def initState(): Unit = ()
+
+ override def loadFromCheckpoint(chkpt: CheckpointState): Unit = ()
+ }
+
+ def createSourceOp(logicalOpId: String): PhysicalOp =
+ PhysicalOp.sourcePhysicalOp(
+ PhysicalOpIdentity(OperatorIdentity(logicalOpId), "main"),
+ DEFAULT_WORKFLOW_ID,
+ DEFAULT_EXECUTION_ID,
+ OpExecWithClassName("unused")
+ )
+
+ def createWorkerId(physicalOp: PhysicalOp): ActorVirtualIdentity =
+ VirtualIdentityUtils.createWorkerIdentity(DEFAULT_WORKFLOW_ID,
physicalOp.id, 0)
+
+ def createSingleWorkerRegion(
+ regionId: Long,
+ physicalOp: PhysicalOp,
+ workerId: ActorVirtualIdentity
+ ): Region =
+ Region(
+ RegionIdentity(regionId),
+ physicalOps = Set(physicalOp),
+ physicalLinks = Set.empty,
+ resourceConfig = Some(
+ ResourceConfig(
+ operatorConfigs = Map(physicalOp.id ->
OperatorConfig(List(WorkerConfig(workerId))))
+ )
+ )
+ )
+
+ def seedReusableWorkerExecution(
+ workflowExecution: WorkflowExecution,
+ seedRegionId: Long,
+ physicalOp: PhysicalOp,
+ workerId: ActorVirtualIdentity
+ ): Unit = {
+ // RegionExecutionCoordinator skips real worker creation when an execution
for this operator
+ // already exists.
+ workflowExecution
+ .initRegionExecution(createSingleWorkerRegion(seedRegionId, physicalOp,
workerId))
+ .initOperatorExecution(physicalOp.id)
+ .initWorkerExecution(workerId)
+ }
+
+ def await[T](future: Future[T]): T = Await.result(future, testTimeout)
+
+ def waitUntil(condition: => Boolean): Unit = {
+ val deadline = System.nanoTime() + testTimeout.inNanoseconds
+ while (!condition && System.nanoTime() < deadline) {
+ Thread.sleep(20)
+ }
+ assert(condition, s"condition not satisfied within $testTimeout")
+ }
+}
+
+trait RegionCoordinatorTestSupport { self: TestKit =>
+ import RegionCoordinatorTestSupport._
+
+ protected def createControllerHarness(): ControllerHarnessFixture = {
+ val controllerRef = TestActorRef(new ControllerHarness)
+ controllerRef.underlyingActor.actorService.getAvailableNodeAddressesFunc =
() =>
+ Array(controllerRef.path.address)
+ ControllerHarnessFixture(
+ actorService = controllerRef.underlyingActor.actorService,
+ actorRefService = controllerRef.underlyingActor.actorRefMappingService
+ )
+ }
+
+ protected def registerLiveWorker(
+ actorRefService: AkkaActorRefMappingService,
+ workerId: ActorVirtualIdentity
+ ): ActorRef = {
+ val workerRef = system.actorOf(Props(new IdleActor),
s"worker-${System.nanoTime()}")
+ actorRefService.registerActorRef(workerId, workerRef)
+ workerRef
+ }
+}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
new file mode 100644
index 0000000000..8fab3b67fc
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling
+
+import com.twitter.util.Future
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import
org.apache.texera.amber.engine.architecture.common.AkkaActorRefMappingService
+import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
+import
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
+import org.apache.texera.amber.engine.architecture.rpc.controlreturns._
+import
org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorTestSupport._
+import
org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpecLike
+
+import java.util.concurrent.atomic
+
+/**
+ * Tests the real region-coordination lifecycle around synchronous region
kill.
+ *
+ * The tests let the coordinator call the real
`AsyncRPCClient.workerInterface`, capture the generated
+ * `ControlInvocation`s at the controller output gateway, and fulfill those
RPC promises
+ * explicitly. This keeps the important production behavior under test:
+ *
+ * - regular launch RPCs (`initializeExecutor`, `openExecutor`,
`startWorker`) are allowed to
+ * complete immediately;
+ * - `endWorker` can be held pending or failed to model worker-side
drain/termination behavior;
+ * - the real coordinator then decides when to remove actor refs, clean
control channels, mark
+ * workers terminated, and allow the next region to start.
+ */
+class RegionExecutionCoordinatorSpec
+ extends TestKit(ActorSystem("RegionExecutionCoordinatorSpec",
AmberRuntime.akkaConfig))
+ with AnyFlatSpecLike
+ with BeforeAndAfterAll
+ with RegionCoordinatorTestSupport {
+
+ override def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ "RegionExecutionCoordinator" should "send gracefulStop only after EndWorker
succeeds" in {
+ val fixture = createSingleRegionFixture(endWorkerResponse = _ => None)
+
+ launchRegion(fixture.coordinator)
+ val completion = requestRegionCompletion(fixture.coordinator)
+
+ assert(
+ fixture.rpcProbe.methodTrace == Seq(InitializeExecutor, OpenExecutor,
StartWorker, EndWorker)
+ )
+ assert(completion.poll.isEmpty)
+ assert(!fixture.coordinator.isCompleted)
+ assert(fixture.actorRefService.hasActorRef(fixture.workerId))
+
+ fixture.rpcProbe.fulfill(fixture.rpcProbe.onlyEndWorkerCall, EmptyReturn())
+ await(completion)
+
+ assert(fixture.coordinator.isCompleted)
+ assert(!fixture.actorRefService.hasActorRef(fixture.workerId))
+ assert(workerState(fixture) == WorkerState.TERMINATED)
+ assertControlChannelsAreRemoved(fixture)
+ }
+
+ it should "retry EndWorker failures and delay gracefulStop until a retry
succeeds" in {
+ val attempts = new atomic.AtomicInteger(0)
+ val fixture = createSingleRegionFixture(endWorkerResponse =
+ _ =>
+ if (attempts.incrementAndGet() == 1) {
+ Some(transientEndWorkerFailure)
+ } else {
+ None
+ }
+ )
+
+ launchRegion(fixture.coordinator)
+ val completion = requestRegionCompletion(fixture.coordinator)
+
+ waitUntil(fixture.rpcProbe.endWorkerCalls.size >= 2)
+ assert(completion.poll.isEmpty)
+ assert(!fixture.coordinator.isCompleted)
+ assert(fixture.actorRefService.hasActorRef(fixture.workerId))
+
+ fixture.rpcProbe.fulfill(fixture.rpcProbe.endWorkerCalls.last,
EmptyReturn())
+ await(completion)
+
+ assert(fixture.coordinator.isCompleted)
+ assert(fixture.rpcProbe.endWorkerCalls.size == 2)
+ assert(!fixture.actorRefService.hasActorRef(fixture.workerId))
+ assert(workerState(fixture) == WorkerState.TERMINATED)
+ }
+
+ private case class SingleRegionFixture(
+ coordinator: RegionExecutionCoordinator,
+ rpcProbe: ControllerRpcProbe,
+ workflowExecution: WorkflowExecution,
+ region: Region,
+ physicalOp: PhysicalOp,
+ workerId: ActorVirtualIdentity,
+ actorRefService: AkkaActorRefMappingService
+ )
+
+ private def createSingleRegionFixture(
+ endWorkerResponse: WorkerRpcCall => Option[ControlReturn]
+ ): SingleRegionFixture = {
+ val physicalOp = createSourceOp("test-op")
+ val workerId = createWorkerId(physicalOp)
+ val region = createSingleWorkerRegion(1, physicalOp, workerId)
+
+ val workflowExecution = WorkflowExecution()
+ seedReusableWorkerExecution(workflowExecution, seedRegionId = 0,
physicalOp, workerId)
+ workflowExecution.initRegionExecution(region)
+
+ val rpcProbe = new ControllerRpcProbe(endWorkerResponse)
+ val controller = createControllerHarness()
+ registerLiveWorker(controller.actorRefService, workerId)
+
+ // Seed stale control channels to verify that successful termination
removes them.
+ rpcProbe.inputGateway.getChannel(ChannelIdentity(workerId, CONTROLLER,
isControl = true))
+ rpcProbe.outputGateway.getSequenceNumber(
+ ChannelIdentity(CONTROLLER, workerId, isControl = true)
+ )
+
+ val coordinator = new RegionExecutionCoordinator(
+ region,
+ isRestart = false,
+ workflowExecution,
+ rpcProbe.asyncRPCClient,
+ ControllerConfig(None, None, None, None),
+ controller.actorService,
+ controller.actorRefService
+ )
+
+ SingleRegionFixture(
+ coordinator = coordinator,
+ rpcProbe = rpcProbe,
+ workflowExecution = workflowExecution,
+ region = region,
+ physicalOp = physicalOp,
+ workerId = workerId,
+ actorRefService = controller.actorRefService
+ )
+ }
+
+ private def launchRegion(coordinator: RegionExecutionCoordinator): Unit = {
+ await(coordinator.syncStatusAndTransitionRegionExecutionPhase())
+ }
+
+ private def requestRegionCompletion(
+ coordinator: RegionExecutionCoordinator
+ ): Future[Unit] = {
+ coordinator.syncStatusAndTransitionRegionExecutionPhase()
+ }
+
+ private def workerState(fixture: SingleRegionFixture): WorkerState =
+ fixture.workflowExecution
+ .getRegionExecution(fixture.region.id)
+ .getOperatorExecution(fixture.physicalOp.id)
+ .getWorkerExecution(fixture.workerId)
+ .getState
+
+ private def assertControlChannelsAreRemoved(fixture: SingleRegionFixture):
Unit = {
+ assert(
+ !fixture.rpcProbe.inputGateway.getAllControlChannels.exists(
+ _.channelId == ChannelIdentity(fixture.workerId, CONTROLLER, isControl
= true)
+ )
+ )
+ assert(
+ !fixture.rpcProbe.outputGateway.getActiveChannels.exists(
+ _ == ChannelIdentity(CONTROLLER, fixture.workerId, isControl = true)
+ )
+ )
+ }
+
+ private def transientEndWorkerFailure: ControlError =
+ ControlError(
+ errorMessage = "transient EndWorker failure",
+ errorDetails = "",
+ stackTrace = "",
+ language = ErrorLanguage.SCALA
+ )
+}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
new file mode 100644
index 0000000000..f4372e8b57
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
+import
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+import
org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorTestSupport._
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpecLike
+
+import scala.collection.mutable
+
+class WorkflowExecutionCoordinatorSpec
+ extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec",
AmberRuntime.akkaConfig))
+ with AnyFlatSpecLike
+ with BeforeAndAfterAll
+ with RegionCoordinatorTestSupport {
+
+ override def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ "WorkflowExecutionCoordinator" should "start the next region only after
previous region termination succeeds" in {
+ val firstOp = createSourceOp("first-op")
+ val firstWorkerId = createWorkerId(firstOp)
+ val firstRegion = createSingleWorkerRegion(1, firstOp, firstWorkerId)
+
+ val secondOp = createSourceOp("second-op")
+ val secondWorkerId = createWorkerId(secondOp)
+ val secondRegion = createSingleWorkerRegion(2, secondOp, secondWorkerId)
+
+ val workflowExecution = WorkflowExecution()
+ seedReusableWorkerExecution(workflowExecution, seedRegionId = 101,
firstOp, firstWorkerId)
+ seedReusableWorkerExecution(workflowExecution, seedRegionId = 102,
secondOp, secondWorkerId)
+
+ // First region's worker holds endWorker pending until we explicitly
fulfill it; the second
+ // region's worker terminates immediately. This lets us assert the second
region cannot start
+ // until termination of the first finishes.
+ val rpcProbe = new ControllerRpcProbe(
+ endWorkerResponse = call => if (call.receiver == firstWorkerId) None
else Some(EmptyReturn())
+ )
+ val controller = createControllerHarness()
+ registerLiveWorker(controller.actorRefService, firstWorkerId)
+ registerLiveWorker(controller.actorRefService, secondWorkerId)
+
+ val nextRegionLevels = mutable.Queue(Set(firstRegion), Set(secondRegion))
+ val workflowCoordinator = new WorkflowExecutionCoordinator(
+ () => if (nextRegionLevels.nonEmpty) nextRegionLevels.dequeue() else
Set.empty,
+ workflowExecution,
+ ControllerConfig(None, None, None, None),
+ rpcProbe.asyncRPCClient
+ )
+ workflowCoordinator.setupActorRefService(controller.actorRefService)
+
+
await(workflowCoordinator.coordinateRegionExecutors(controller.actorService))
+ assert(rpcProbe.startedWorkers == Seq(firstWorkerId))
+
+ val coordination =
workflowCoordinator.coordinateRegionExecutors(controller.actorService)
+
+ waitUntil(rpcProbe.endWorkerCalls.size == 1)
+ assert(coordination.poll.isEmpty)
+ assert(!rpcProbe.initializedWorkers.contains(secondWorkerId))
+ assert(controller.actorRefService.hasActorRef(firstWorkerId))
+
+ rpcProbe.fulfill(rpcProbe.onlyEndWorkerCall, EmptyReturn())
+ await(coordination)
+
+ assert(!controller.actorRefService.hasActorRef(firstWorkerId))
+ assert(rpcProbe.initializedWorkers.contains(secondWorkerId))
+ assert(rpcProbe.startedWorkers.contains(secondWorkerId))
+ }
+}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandlerSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandlerSpec.scala
new file mode 100644
index 0000000000..90e8b817be
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandlerSpec.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.worker.promisehandlers
+
+import com.twitter.util.{Await, Duration, Future}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+ AsyncRPCContext,
+ EmptyRequest
+}
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+import
org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_QUERY_STATISTICS
+import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{
+ ActorCommandElement,
+ DPInputQueueElement,
+ FIFOMessageElement,
+ MainThreadDelegateMessage
+}
+import org.apache.texera.amber.engine.architecture.worker.{
+ DataProcessor,
+ DataProcessorRPCHandlerInitializer
+}
+import org.apache.texera.amber.engine.common.actormessage.Backpressure
+import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
+import
org.apache.texera.amber.engine.common.rpc.AsyncRPCClient.ControlInvocation
+import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.util.concurrent.LinkedBlockingQueue
+
+/**
+ * `endWorker` is the controller's acknowledgement point before it sends
actor-level `gracefulStop`.
+ *
+ * A successful reply means the worker has drained every queued workflow
message. If the queue still contains work,
+ * the handler must fail so the region coordinator can retry the kill instead
of stopping the actor too early.
+ */
+class EndHandlerSpec extends AnyFlatSpec {
+ private val workerId = ActorVirtualIdentity("Worker:WF1-test-op-main-0")
+ private val rpcContext = AsyncRPCContext(CONTROLLER, workerId)
+ private val awaitTimeout = Duration.fromSeconds(1)
+
+ private def createEndHandlerForQueue(
+ queue: LinkedBlockingQueue[DPInputQueueElement]
+ ): DataProcessorRPCHandlerInitializer = {
+ val outputHandler: Either[MainThreadDelegateMessage, WorkflowFIFOMessage]
=> Unit = _ => ()
+ val dp = new DataProcessor(workerId, outputHandler, queue)
+ new DataProcessorRPCHandlerInitializer(dp)
+ }
+
+ private def await[T](future: Future[T]): T = Await.result(future,
awaitTimeout)
+
+ private def assertEndWorkerFails(handler:
DataProcessorRPCHandlerInitializer): Unit = {
+ val exception = intercept[IllegalStateException] {
+ await(handler.endWorker(EmptyRequest(), rpcContext))
+ }
+ assert(exception.getMessage == "worker still has unprocessed messages")
+ }
+
+ private def queueWithFifoControlMessage():
LinkedBlockingQueue[DPInputQueueElement] = {
+ val queue = new LinkedBlockingQueue[DPInputQueueElement]()
+ queue.put(
+ FIFOMessageElement(
+ WorkflowFIFOMessage(
+ ChannelIdentity(CONTROLLER, workerId, isControl = true),
+ 0,
+ ControlInvocation(METHOD_QUERY_STATISTICS, EmptyRequest(),
rpcContext, 1)
+ )
+ )
+ )
+ queue
+ }
+
+ private def queueWithActorCommand():
LinkedBlockingQueue[DPInputQueueElement] = {
+ val queue = new LinkedBlockingQueue[DPInputQueueElement]()
+ queue.put(ActorCommandElement(Backpressure(enableBackpressure = true)))
+ queue
+ }
+
+ "EndHandler" should "reply successfully when there are no unprocessed
messages" in {
+ val handler = createEndHandlerForQueue(new
LinkedBlockingQueue[DPInputQueueElement]())
+
+ assert(await(handler.endWorker(EmptyRequest(), rpcContext)) ==
EmptyReturn())
+ }
+
+ it should "fail when a FIFO control message is still queued" in {
+ val handler = createEndHandlerForQueue(queueWithFifoControlMessage())
+
+ assertEndWorkerFails(handler)
+ }
+
+ it should "fail when an actor command is still queued" in {
+ val handler = createEndHandlerForQueue(queueWithActorCommand())
+
+ assertEndWorkerFails(handler)
+ }
+}