This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 23901be24b fix(amber): wire ExecutionReconfigurationService back to
the engine (#4531)
23901be24b is described below
commit 23901be24b23191496cd6c8110380a74eaf61de3
Author: Yicong Huang <[email protected]>
AuthorDate: Sun Apr 26 21:08:06 2026 -0700
fix(amber): wire ExecutionReconfigurationService back to the engine (#4531)
### What changes were proposed in this PR?
Follow-up to #4220, restoring the full reconfiguration flow end-to-end.
Two coupled gaps:
**1. Web-service entrypoint never dispatched.**
`ExecutionReconfigurationService.performReconfigurationOnResume` was
still throwing `"reconfiguration is tentatively disabled."` and the body
that calls `controllerInterface.reconfigureWorkflow` was commented out.
Restored, adapted to the current proto shape
(`UpdateExecutorRequest(targetOpId, newExecInitInfo)` — no more
proto-Any boxing). Resets `ExecutionReconfigurationStore` with a fresh
`currentReconfigId` after dispatch.
`StateTransferFunc` is dropped — the new request schema doesn't carry
it.
**2. Engine never reported per-worker completion.**
`ReconfigurationHandler` collected the worker `updateExecutor` futures
but only returned `EmptyReturn` when they all finished — no
`UpdateExecutorCompleted(worker)` events were ever sent to the client.
Without those,
`ExecutionReconfigurationService.completedReconfigurations` stayed
empty, the diff handler never fired, and the frontend never saw
`ModifyLogicCompletedEvent`. The `UpdateExecutorCompleted` case class
was effectively dead code.
Each per-worker future is now wrapped with
`sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component
branches (single-op and multi-op). The web-service
`client.registerCallback[UpdateExecutorCompleted]` is re-enabled to
advance `completedReconfigurations` on receipt.
### Any related issues, documentation, discussions?
Follow-up to #4220. See discussion #4016.
### How was this PR tested?
`ExecutionReconfigurationServiceSpec` (new) covers:
- empty pending list → no dispatch, store unchanged;
- non-empty list → one dispatch carrying the right `(targetOpId,
newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`;
- consecutive resumes get distinct `reconfigurationId`s;
- worker completion (`onWorkerReconfigured`) updates
`completedReconfigurations` with Set semantics (idempotent on
duplicates).
The test uses three protected seams (`dispatch`,
`registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so
the service can be constructed without a live `AmberClient` or
`Workflow`.
End-to-end engine path is covered by `ReconfigurationSpec` from #4220;
the new `sendToClient` calls are no-ops when no callback is registered,
so existing assertions are unaffected.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../core/architecture/rpc/async_rpc_server.py | 21 ++-
.../promisehandlers/ReconfigurationHandler.scala | 47 ++++---
.../engine/architecture/worker/DataProcessor.scala | 11 +-
.../service/ExecutionReconfigurationService.scala | 136 +++++++++++--------
.../amber/engine/e2e/ReconfigurationSpec.scala | 18 ++-
.../ExecutionReconfigurationServiceSpec.scala | 149 +++++++++++++++++++++
6 files changed, 299 insertions(+), 83 deletions(-)
diff --git a/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
b/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
index d776307030..49dc5f0547 100644
--- a/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
+++ b/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
@@ -121,10 +121,23 @@ class AsyncRPCServer:
if self._no_reply_needed(control_invocation.command_id):
return
- # Reply to the sender.
- target_channel_id = ChannelIdentity(
- from_.to_worker_id, from_.from_worker_id, True
- )
+ # Reply to the actor that originated this ControlInvocation, identified
+ # by control_invocation.context.sender. For a normal RPC over a
+ # control channel this matches `from_.from_worker_id`; for an
+ # invocation carried in-band by an ECM along a data channel, `from_`
+ # is the data channel between two workers and the original sender
+ # lives only in the invocation's context.
+ # When the context is unset (e.g. unit-test inputs that construct
+ # ControlInvocation directly), fall back to swapping `from_`.
+ ctx = control_invocation.context
+ if ctx.sender.name and ctx.receiver.name:
+ target_channel_id = ChannelIdentity(
+ ctx.receiver, ctx.sender, is_control=True
+ )
+ else:
+ target_channel_id = ChannelIdentity(
+ from_.to_worker_id, from_.from_worker_id, is_control=True
+ )
logger.debug(
f"PYTHON returns a ReturnInvocation {payload}, replying the
command"
f" {command}"
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
index 210d7c5b98..7653f873c1 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
@@ -21,16 +21,21 @@ package
org.apache.texera.amber.engine.architecture.controller.promisehandlers
import com.twitter.util.Future
import org.apache.texera.amber.core.virtualidentity.{
+ ActorVirtualIdentity,
ChannelIdentity,
EmbeddedControlMessageIdentity
}
-import
org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
+import org.apache.texera.amber.engine.architecture.controller.{
+ ControllerAsyncRPCHandlerInitializer,
+ UpdateExecutorCompleted
+}
import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
+ ControlInvocation,
WorkflowReconfigureRequest
}
-import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ControlReturn,
EmptyReturn}
import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
import org.apache.texera.amber.util.VirtualIdentityUtils
@@ -64,7 +69,12 @@ trait ReconfigurationHandler {
.getLatestOperatorExecution(updateExecutorRequest.targetOpId)
.getWorkerIds
workerIds.foreach { worker =>
- futures.append(workerInterface.updateExecutor(updateExecutorRequest,
mkContext(worker)))
+ futures.append(
+ notifyOnComplete(
+ workerInterface.updateExecutor(updateExecutorRequest,
mkContext(worker)),
+ worker
+ )
+ )
}
} else {
val channelScope = cp.workflowExecution.getRunningRegionExecutions
@@ -88,20 +98,21 @@ trait ReconfigurationHandler {
}
}
val finalScope = channelScope ++ controlChannels
- val cmdMapping =
+ val workerCommands: Seq[(ActorVirtualIdentity, ControlInvocation,
Future[ControlReturn])] =
friesComponent.reconfigurations.flatMap { updateReq =>
val workers =
cp.workflowExecution.getLatestOperatorExecution(updateReq.targetOpId).getWorkerIds
- workers.map(worker =>
- worker.name -> createInvocation(
- METHOD_UPDATE_EXECUTOR.getBareMethodName,
- updateReq,
- worker
- )
- )
- }.toMap
- futures += cmdMapping.map {
- case (_, (_, singleWorkerUpdateFuture)) => singleWorkerUpdateFuture
+ workers.map { worker =>
+ val (invocation, future) =
+ createInvocation(METHOD_UPDATE_EXECUTOR.getBareMethodName,
updateReq, worker)
+ (worker, invocation, future)
+ }
+ }.toSeq
+ val cmdMapping: Map[String, ControlInvocation] = workerCommands.map {
+ case (worker, invocation, _) => worker.name -> invocation
+ }.toMap
+ futures ++= workerCommands.map {
+ case (worker, _, future) => notifyOnComplete(future, worker)
}
friesComponent.sources.foreach { source =>
cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach {
worker =>
@@ -109,7 +120,7 @@ trait ReconfigurationHandler {
EmbeddedControlMessageIdentity(msg.reconfigurationId),
ALL_ALIGNMENT,
finalScope.toSet,
- cmdMapping.map(x => (x._1, x._2._1)),
+ cmdMapping,
ChannelIdentity(actorId, worker, isControl = true)
)
}
@@ -121,4 +132,10 @@ trait ReconfigurationHandler {
}
}
+ // After a worker's updateExecutor completes, notify the client so the
+ // ExecutionReconfigurationService can advance completedReconfigurations
+ // and emit ModifyLogicCompletedEvent on the websocket.
+ private def notifyOnComplete[T](future: Future[T], worker:
ActorVirtualIdentity): Future[T] =
+ future.onSuccess(_ => sendToClient(UpdateExecutorCompleted(worker)))
+
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
index 3aa5fa90a4..84f1e8ec65 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
@@ -242,7 +242,16 @@ class DataProcessor(
// invoke the control command carried with the ECM
logger.info(s"process ECM from $channelId, id = ${ecm.id}, cmd =
$command")
if (command.isDefined) {
- asyncRPCServer.receive(command.get, channelId.fromWorkerId)
+ // The reply must go back to the actor that originated the invocation
+ // (recorded in command.context.sender), not to channelId.fromWorkerId.
+ // For ECM-embedded commands those differ: channelId is the data
+ // channel between two workers, while the originator is typically the
+ // controller. Fall back to the channel sender when the context is
+ // unset (e.g. unit-test inputs).
+ val ctx = command.get.context
+ val replyTo =
+ if (ctx.sender.name.nonEmpty) ctx.sender else channelId.fromWorkerId
+ asyncRPCServer.receive(command.get, replyTo)
}
// if this worker is not the final destination of the ECM, pass it
downstream
val downstreamChannelsInScope = ecm.scope.filter(_.fromWorkerId ==
actorId).toSet
diff --git
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
index e5867277fc..e7617fdfe1 100644
---
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
+++
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
@@ -19,7 +19,12 @@
package org.apache.texera.web.service
-import org.apache.texera.amber.engine.architecture.controller.Workflow
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import
org.apache.texera.amber.engine.architecture.controller.{UpdateExecutorCompleted,
Workflow}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+ UpdateExecutorRequest,
+ WorkflowReconfigureRequest
+}
import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.web.SubscriptionManager
import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
@@ -28,8 +33,9 @@ import org.apache.texera.web.model.websocket.response.{
ModifyLogicCompletedEvent,
ModifyLogicResponse
}
-import org.apache.texera.web.storage.ExecutionStateStore
+import org.apache.texera.web.storage.{ExecutionReconfigurationStore,
ExecutionStateStore}
+import java.util.UUID
import scala.util.{Failure, Success}
class ExecutionReconfigurationService(
@@ -39,34 +45,11 @@ class ExecutionReconfigurationService(
) extends SubscriptionManager {
// monitors notification from the engine that a reconfiguration on a worker
is completed
- // client.registerCallback[UpdateExecutorCompleted]((evt:
UpdateExecutorCompleted) => {
- // stateStore.reconfigurationStore.updateState(old => {
- // old.copy(completedReconfigurations = old.completedReconfigurations +
evt.id)
- // })
- // })
+ registerWorkerCompletionCallback()
// monitors the reconfiguration state (completed workers) change,
// notifies the frontend when all workers of an operator complete
reconfiguration
- addSubscription(
- stateStore.reconfigurationStore.registerDiffHandler((oldState, newState)
=> {
- if (
- oldState.completedReconfigurations !=
newState.completedReconfigurations
- && oldState.currentReconfigId == newState.currentReconfigId
- ) {
- val diff = newState.completedReconfigurations --
oldState.completedReconfigurations
- val newlyCompletedOps = diff
- .map(workerId =>
workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id)
- .map(opId => opId.logicalOpId.id)
- if (newlyCompletedOps.nonEmpty) {
- List(ModifyLogicCompletedEvent(newlyCompletedOps.toList))
- } else {
- List()
- }
- } else {
- List()
- }
- })
- )
+ registerCompletionDiffHandler()
// handles reconfigure workflow logic from frontend
// validate the modify logic request and notifies the frontend
@@ -96,42 +79,77 @@ class ExecutionReconfigurationService(
// actually performs all reconfiguration requests the user made during pause
// sends ModifyLogic messages to operators and workers,
- // there are two modes: transactional or non-transactional
- // in the transactional mode, reconfigurations on multiple operators will be
synchronized
- // in the non-transaction mode, they are not synchronized, this is faster,
but can lead to consistency issues
- // for details, see the Fries reconfiguration paper
+ // see the Fries reconfiguration paper for the algorithm.
+ // Note: StateTransferFunc is currently not threaded through to the engine —
+ // the new UpdateExecutorRequest only carries (targetOpId,
newOpExecInitInfo).
def performReconfigurationOnResume(): Unit = {
val reconfigurations =
stateStore.reconfigurationStore.getState.unscheduledReconfigurations
if (reconfigurations.isEmpty) {
return
}
- throw new RuntimeException("reconfiguration is tentatively disabled.")
- // // schedule all pending reconfigurations to the engine
- // val reconfigurationId = UUID.randomUUID().toString
- // val modifyLogicReq = AmberModifyLogicRequest(reconfigurations.map {
- // case (op, stateTransferFunc) =>
- // val bytes = AmberRuntime.serde.serialize(op.opExecInitInfo).get
- // val protoAny = Any.of(
- //
"org.apache.texera.amber.engine.architecture.deploysemantics.layer.OpExecInitInfo",
- // ByteString.copyFrom(bytes)
- // )
- // val stateTransferFuncOpt = stateTransferFunc.map { func =>
- // val bytes = AmberRuntime.serde.serialize(func).get
- // Any.of(
- //
"org.apache.texera.workflow.common.operators.StateTransferFunc",
- // ByteString.copyFrom(bytes)
- // )
- // }
- // UpdateExecutorRequest(op.id, protoAny, stateTransferFuncOpt)
- // })
- // client.controllerInterface.reconfigureWorkflow(
- // WorkflowReconfigureRequest(modifyLogicReq, reconfigurationId),
- // ()
- // )
- //
- // // clear all un-scheduled reconfigurations, start a new
reconfiguration ID
- // stateStore.reconfigurationStore.updateState(_ =>
- // ExecutionReconfigurationStore(Some(reconfigurationId))
- // )
+
+ val reconfigurationId = UUID.randomUUID().toString
+ val updateExecutorRequests = reconfigurations.map {
+ case (op, _) => UpdateExecutorRequest(op.id, op.opExecInitInfo)
+ }
+ dispatch(
+ WorkflowReconfigureRequest(
+ reconfiguration = updateExecutorRequests,
+ reconfigurationId = reconfigurationId
+ )
+ )
+
+ // clear all un-scheduled reconfigurations, start a new reconfiguration ID
+ stateStore.reconfigurationStore.updateState(_ =>
+ ExecutionReconfigurationStore(currentReconfigId =
Some(reconfigurationId))
+ )
+ }
+
+ // Seam for unit testing the dispatch path without spinning up an
AmberClient.
+ protected def dispatch(request: WorkflowReconfigureRequest): Unit = {
+ client.controllerInterface.reconfigureWorkflow(request, ())
+ }
+
+ // Seam for unit testing — production wires the engine's
UpdateExecutorCompleted
+ // events into the reconfiguration store so the diff handler above can fire
+ // ModifyLogicCompletedEvent for the frontend.
+ protected def registerWorkerCompletionCallback(): Unit = {
+ client.registerCallback[UpdateExecutorCompleted]((evt:
UpdateExecutorCompleted) => {
+ onWorkerReconfigured(evt.id)
+ })
+ }
+
+ // Exposed (instead of inlined in the callback) so tests can drive the
+ // completion path directly.
+ private[service] def onWorkerReconfigured(worker: ActorVirtualIdentity):
Unit = {
+ stateStore.reconfigurationStore.updateState(old =>
+ old.copy(completedReconfigurations = old.completedReconfigurations +
worker)
+ )
+ }
+
+ // Seam for unit testing — the diff handler dereferences
workflow.physicalPlan
+ // to map worker → logical op, which makes constructing a service in tests
+ // require a full Workflow. Tests override to no-op.
+ protected def registerCompletionDiffHandler(): Unit = {
+ addSubscription(
+ stateStore.reconfigurationStore.registerDiffHandler((oldState, newState)
=> {
+ if (
+ oldState.completedReconfigurations !=
newState.completedReconfigurations
+ && oldState.currentReconfigId == newState.currentReconfigId
+ ) {
+ val diff = newState.completedReconfigurations --
oldState.completedReconfigurations
+ val newlyCompletedOps = diff
+ .map(workerId =>
workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id)
+ .map(opId => opId.logicalOpId.id)
+ if (newlyCompletedOps.nonEmpty) {
+ List(ModifyLogicCompletedEvent(newlyCompletedOps.toList))
+ } else {
+ List()
+ }
+ } else {
+ List()
+ }
+ })
+ )
}
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 6f344caae3..92dfba19de 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -150,9 +150,15 @@ class ReconfigurationSpec
completion.setDone()
}
})
- Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+ Await.result(
+ client.controllerInterface.startWorkflow(EmptyRequest(), ()),
+ Duration.fromSeconds(5)
+ )
val pausedReached = stateReached(client, PAUSED)
- Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
+ Await.result(
+ client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
+ Duration.fromSeconds(5)
+ )
Await.result(pausedReached, Duration.fromSeconds(10))
val physicalOps = targetOps.flatMap(op =>
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
@@ -164,9 +170,13 @@ class ReconfigurationSpec
reconfigurationId = "test-reconfigure-1"
),
()
- )
+ ),
+ Duration.fromSeconds(5)
+ )
+ Await.result(
+ client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
+ Duration.fromSeconds(5)
)
- Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
Await.result(completion, Duration.fromMinutes(1))
result
}
diff --git
a/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala
b/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala
new file mode 100644
index 0000000000..974db13286
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.service
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{
+ ActorVirtualIdentity,
+ ExecutionIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity,
+ WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.WorkflowReconfigureRequest
+import org.apache.texera.web.storage.{ExecutionReconfigurationStore,
ExecutionStateStore}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Web-service-layer tests for ExecutionReconfigurationService.
+ *
+ * The end-to-end engine path (reconfigureWorkflow → Fries algorithm →
+ * UpdateExecutor on workers) is covered by ReconfigurationSpec.
+ * This spec focuses on the wiring inside performReconfigurationOnResume:
+ * empty short-circuit, request construction, and store reset semantics.
+ */
+class ExecutionReconfigurationServiceSpec extends AnyFlatSpec with Matchers {
+
+ private def mkPhysicalOp(name: String): PhysicalOp =
+ PhysicalOp(
+ id = PhysicalOpIdentity(OperatorIdentity(name), "main"),
+ workflowId = WorkflowIdentity(0L),
+ executionId = ExecutionIdentity(0L),
+ opExecInitInfo = OpExecWithClassName(s"$name.Class", "")
+ )
+
+ /** Service variant that records dispatched requests and skips the
AmberClient
+ * registration / workflow-dependent diff handler so it can be constructed
+ * without a live engine.
+ */
+ private class RecordingService(stateStore: ExecutionStateStore)
+ extends ExecutionReconfigurationService(client = null, stateStore,
workflow = null) {
+ val captured: ArrayBuffer[WorkflowReconfigureRequest] = ArrayBuffer.empty
+ override protected def dispatch(request: WorkflowReconfigureRequest): Unit
=
+ captured += request
+ override protected def registerWorkerCompletionCallback(): Unit = ()
+ override protected def registerCompletionDiffHandler(): Unit = ()
+ }
+
+ "performReconfigurationOnResume" should
+ "return without dispatching when no reconfigurations are pending" in {
+ val stateStore = new ExecutionStateStore()
+ val service = new RecordingService(stateStore)
+
+ noException should be thrownBy service.performReconfigurationOnResume()
+
+ service.captured shouldBe empty
+ val state = stateStore.reconfigurationStore.getState
+ state.unscheduledReconfigurations shouldBe empty
+ state.currentReconfigId shouldBe None
+ state.completedReconfigurations shouldBe empty
+ }
+
+ it should "dispatch one request carrying every pending reconfiguration and
reset the store" in {
+ val stateStore = new ExecutionStateStore()
+ val service = new RecordingService(stateStore)
+
+ val op1 = mkPhysicalOp("op-1")
+ val op2 = mkPhysicalOp("op-2")
+ stateStore.reconfigurationStore.updateState(_ =>
+ ExecutionReconfigurationStore(unscheduledReconfigurations = List((op1,
None), (op2, None)))
+ )
+
+ service.performReconfigurationOnResume()
+
+ service.captured should have size 1
+ val request = service.captured.head
+ request.reconfigurationId should not be empty
+ request.reconfiguration.map(_.targetOpId) should contain
theSameElementsInOrderAs Seq(
+ op1.id,
+ op2.id
+ )
+ request.reconfiguration.map(_.newExecInitInfo) should contain
theSameElementsInOrderAs Seq(
+ op1.opExecInitInfo,
+ op2.opExecInitInfo
+ )
+
+ val state = stateStore.reconfigurationStore.getState
+ state.unscheduledReconfigurations shouldBe empty
+ state.currentReconfigId shouldBe Some(request.reconfigurationId)
+ state.completedReconfigurations shouldBe empty
+ }
+
+ it should "use a fresh reconfigurationId on each dispatch" in {
+ val stateStore = new ExecutionStateStore()
+ val service = new RecordingService(stateStore)
+
+ def queueAndDispatch(opName: String): String = {
+ stateStore.reconfigurationStore.updateState(old =>
+ old.copy(unscheduledReconfigurations = List((mkPhysicalOp(opName),
None)))
+ )
+ service.performReconfigurationOnResume()
+ service.captured.last.reconfigurationId
+ }
+
+ val firstId = queueAndDispatch("op-a")
+ val secondId = queueAndDispatch("op-b")
+
+ firstId should not be secondId
+ stateStore.reconfigurationStore.getState.currentReconfigId shouldBe
Some(secondId)
+ }
+
+ "onWorkerReconfigured" should
+ "add the worker id to completedReconfigurations so the diff handler can
fire" in {
+ val stateStore = new ExecutionStateStore()
+ val service = new RecordingService(stateStore)
+
+ val w1 = ActorVirtualIdentity("Worker:WF1-E1-op-main-0")
+ val w2 = ActorVirtualIdentity("Worker:WF1-E1-op-main-1")
+ service.onWorkerReconfigured(w1)
+ service.onWorkerReconfigured(w2)
+ // duplicate completion is idempotent (Set semantics).
+ service.onWorkerReconfigured(w1)
+
+ stateStore.reconfigurationStore.getState.completedReconfigurations should
contain theSameElementsAs Set(
+ w1,
+ w2
+ )
+ }
+}