This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 23901be24b fix(amber): wire ExecutionReconfigurationService back to 
the engine (#4531)
23901be24b is described below

commit 23901be24b23191496cd6c8110380a74eaf61de3
Author: Yicong Huang <[email protected]>
AuthorDate: Sun Apr 26 21:08:06 2026 -0700

    fix(amber): wire ExecutionReconfigurationService back to the engine (#4531)
    
    ### What changes were proposed in this PR?
    
    Follow-up to #4220, restoring the full reconfiguration flow end-to-end.
    Two coupled gaps:
    
    **1. Web-service entrypoint never dispatched.**
    `ExecutionReconfigurationService.performReconfigurationOnResume` was
    still throwing `"reconfiguration is tentatively disabled."` and the body
    that calls `controllerInterface.reconfigureWorkflow` was commented out.
    Restored, adapted to the current proto shape
    (`UpdateExecutorRequest(targetOpId, newExecInitInfo)` — no more
    proto-Any boxing). Resets `ExecutionReconfigurationStore` with a fresh
    `currentReconfigId` after dispatch.
    
    `StateTransferFunc` is dropped — the new request schema doesn't carry
    it.
    
    **2. Engine never reported per-worker completion.**
    `ReconfigurationHandler` collected the worker `updateExecutor` futures
    but only returned `EmptyReturn` when they all finished — no
    `UpdateExecutorCompleted(worker)` events were ever sent to the client.
    Without those,
    `ExecutionReconfigurationService.completedReconfigurations` stayed
    empty, the diff handler never fired, and the frontend never saw
    `ModifyLogicCompletedEvent`. The `UpdateExecutorCompleted` case class
    was effectively dead code.
    
    Each per-worker future is now wrapped with
    `sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component
    branches (single-op and multi-op). The web-service
    `client.registerCallback[UpdateExecutorCompleted]` is re-enabled to
    advance `completedReconfigurations` on receipt.
    
    ### Any related issues, documentation, discussions?
    
    Follow-up to #4220. See discussion #4016.
    
    ### How was this PR tested?
    
    `ExecutionReconfigurationServiceSpec` (new) covers:
    - empty pending list → no dispatch, store unchanged;
    - non-empty list → one dispatch carrying the right `(targetOpId,
    newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`;
    - consecutive resumes get distinct `reconfigurationId`s;
    - worker completion (`onWorkerReconfigured`) updates
    `completedReconfigurations` with Set semantics (idempotent on
    duplicates).
    
    The test uses three protected seams (`dispatch`,
    `registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so
    the service can be constructed without a live `AmberClient` or
    `Workflow`.
    
    End-to-end engine path is covered by `ReconfigurationSpec` from #4220;
    the new `sendToClient` calls are no-ops when no callback is registered,
    so existing assertions are unaffected.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../core/architecture/rpc/async_rpc_server.py      |  21 ++-
 .../promisehandlers/ReconfigurationHandler.scala   |  47 ++++---
 .../engine/architecture/worker/DataProcessor.scala |  11 +-
 .../service/ExecutionReconfigurationService.scala  | 136 +++++++++++--------
 .../amber/engine/e2e/ReconfigurationSpec.scala     |  18 ++-
 .../ExecutionReconfigurationServiceSpec.scala      | 149 +++++++++++++++++++++
 6 files changed, 299 insertions(+), 83 deletions(-)

diff --git a/amber/src/main/python/core/architecture/rpc/async_rpc_server.py 
b/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
index d776307030..49dc5f0547 100644
--- a/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
+++ b/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
@@ -121,10 +121,23 @@ class AsyncRPCServer:
         if self._no_reply_needed(control_invocation.command_id):
             return
 
-        # Reply to the sender.
-        target_channel_id = ChannelIdentity(
-            from_.to_worker_id, from_.from_worker_id, True
-        )
+        # Reply to the actor that originated this ControlInvocation, identified
+        # by control_invocation.context.sender. For a normal RPC over a
+        # control channel this matches `from_.from_worker_id`; for an
+        # invocation carried in-band by an ECM along a data channel, `from_`
+        # is the data channel between two workers and the original sender
+        # lives only in the invocation's context.
+        # When the context is unset (e.g. unit-test inputs that construct
+        # ControlInvocation directly), fall back to swapping `from_`.
+        ctx = control_invocation.context
+        if ctx.sender.name and ctx.receiver.name:
+            target_channel_id = ChannelIdentity(
+                ctx.receiver, ctx.sender, is_control=True
+            )
+        else:
+            target_channel_id = ChannelIdentity(
+                from_.to_worker_id, from_.from_worker_id, is_control=True
+            )
         logger.debug(
             f"PYTHON returns a ReturnInvocation {payload}, replying the 
command"
             f" {command}"
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
index 210d7c5b98..7653f873c1 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
@@ -21,16 +21,21 @@ package 
org.apache.texera.amber.engine.architecture.controller.promisehandlers
 
 import com.twitter.util.Future
 import org.apache.texera.amber.core.virtualidentity.{
+  ActorVirtualIdentity,
   ChannelIdentity,
   EmbeddedControlMessageIdentity
 }
-import 
org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
+import org.apache.texera.amber.engine.architecture.controller.{
+  ControllerAsyncRPCHandlerInitializer,
+  UpdateExecutorCompleted
+}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
+  ControlInvocation,
   WorkflowReconfigureRequest
 }
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ControlReturn, 
EmptyReturn}
 import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.apache.texera.amber.util.VirtualIdentityUtils
@@ -64,7 +69,12 @@ trait ReconfigurationHandler {
           .getLatestOperatorExecution(updateExecutorRequest.targetOpId)
           .getWorkerIds
         workerIds.foreach { worker =>
-          futures.append(workerInterface.updateExecutor(updateExecutorRequest, 
mkContext(worker)))
+          futures.append(
+            notifyOnComplete(
+              workerInterface.updateExecutor(updateExecutorRequest, 
mkContext(worker)),
+              worker
+            )
+          )
         }
       } else {
         val channelScope = cp.workflowExecution.getRunningRegionExecutions
@@ -88,20 +98,21 @@ trait ReconfigurationHandler {
           }
         }
         val finalScope = channelScope ++ controlChannels
-        val cmdMapping =
+        val workerCommands: Seq[(ActorVirtualIdentity, ControlInvocation, 
Future[ControlReturn])] =
           friesComponent.reconfigurations.flatMap { updateReq =>
             val workers =
               
cp.workflowExecution.getLatestOperatorExecution(updateReq.targetOpId).getWorkerIds
-            workers.map(worker =>
-              worker.name -> createInvocation(
-                METHOD_UPDATE_EXECUTOR.getBareMethodName,
-                updateReq,
-                worker
-              )
-            )
-          }.toMap
-        futures += cmdMapping.map {
-          case (_, (_, singleWorkerUpdateFuture)) => singleWorkerUpdateFuture
+            workers.map { worker =>
+              val (invocation, future) =
+                createInvocation(METHOD_UPDATE_EXECUTOR.getBareMethodName, 
updateReq, worker)
+              (worker, invocation, future)
+            }
+          }.toSeq
+        val cmdMapping: Map[String, ControlInvocation] = workerCommands.map {
+          case (worker, invocation, _) => worker.name -> invocation
+        }.toMap
+        futures ++= workerCommands.map {
+          case (worker, _, future) => notifyOnComplete(future, worker)
         }
         friesComponent.sources.foreach { source =>
           
cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach { 
worker =>
@@ -109,7 +120,7 @@ trait ReconfigurationHandler {
               EmbeddedControlMessageIdentity(msg.reconfigurationId),
               ALL_ALIGNMENT,
               finalScope.toSet,
-              cmdMapping.map(x => (x._1, x._2._1)),
+              cmdMapping,
               ChannelIdentity(actorId, worker, isControl = true)
             )
           }
@@ -121,4 +132,10 @@ trait ReconfigurationHandler {
     }
   }
 
+  // After a worker's updateExecutor completes, notify the client so the
+  // ExecutionReconfigurationService can advance completedReconfigurations
+  // and emit ModifyLogicCompletedEvent on the websocket.
+  private def notifyOnComplete[T](future: Future[T], worker: 
ActorVirtualIdentity): Future[T] =
+    future.onSuccess(_ => sendToClient(UpdateExecutorCompleted(worker)))
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
index 3aa5fa90a4..84f1e8ec65 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
@@ -242,7 +242,16 @@ class DataProcessor(
       // invoke the control command carried with the ECM
       logger.info(s"process ECM from $channelId, id = ${ecm.id}, cmd = 
$command")
       if (command.isDefined) {
-        asyncRPCServer.receive(command.get, channelId.fromWorkerId)
+        // The reply must go back to the actor that originated the invocation
+        // (recorded in command.context.sender), not to channelId.fromWorkerId.
+        // For ECM-embedded commands those differ: channelId is the data
+        // channel between two workers, while the originator is typically the
+        // controller. Fall back to the channel sender when the context is
+        // unset (e.g. unit-test inputs).
+        val ctx = command.get.context
+        val replyTo =
+          if (ctx.sender.name.nonEmpty) ctx.sender else channelId.fromWorkerId
+        asyncRPCServer.receive(command.get, replyTo)
       }
       // if this worker is not the final destination of the ECM, pass it 
downstream
       val downstreamChannelsInScope = ecm.scope.filter(_.fromWorkerId == 
actorId).toSet
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
index e5867277fc..e7617fdfe1 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
@@ -19,7 +19,12 @@
 
 package org.apache.texera.web.service
 
-import org.apache.texera.amber.engine.architecture.controller.Workflow
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import 
org.apache.texera.amber.engine.architecture.controller.{UpdateExecutorCompleted,
 Workflow}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+  UpdateExecutorRequest,
+  WorkflowReconfigureRequest
+}
 import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.web.SubscriptionManager
 import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
@@ -28,8 +33,9 @@ import org.apache.texera.web.model.websocket.response.{
   ModifyLogicCompletedEvent,
   ModifyLogicResponse
 }
-import org.apache.texera.web.storage.ExecutionStateStore
+import org.apache.texera.web.storage.{ExecutionReconfigurationStore, 
ExecutionStateStore}
 
+import java.util.UUID
 import scala.util.{Failure, Success}
 
 class ExecutionReconfigurationService(
@@ -39,34 +45,11 @@ class ExecutionReconfigurationService(
 ) extends SubscriptionManager {
 
   // monitors notification from the engine that a reconfiguration on a worker 
is completed
-  //  client.registerCallback[UpdateExecutorCompleted]((evt: 
UpdateExecutorCompleted) => {
-  //    stateStore.reconfigurationStore.updateState(old => {
-  //      old.copy(completedReconfigurations = old.completedReconfigurations + 
evt.id)
-  //    })
-  //  })
+  registerWorkerCompletionCallback()
 
   // monitors the reconfiguration state (completed workers) change,
   // notifies the frontend when all workers of an operator complete 
reconfiguration
-  addSubscription(
-    stateStore.reconfigurationStore.registerDiffHandler((oldState, newState) 
=> {
-      if (
-        oldState.completedReconfigurations != 
newState.completedReconfigurations
-        && oldState.currentReconfigId == newState.currentReconfigId
-      ) {
-        val diff = newState.completedReconfigurations -- 
oldState.completedReconfigurations
-        val newlyCompletedOps = diff
-          .map(workerId => 
workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id)
-          .map(opId => opId.logicalOpId.id)
-        if (newlyCompletedOps.nonEmpty) {
-          List(ModifyLogicCompletedEvent(newlyCompletedOps.toList))
-        } else {
-          List()
-        }
-      } else {
-        List()
-      }
-    })
-  )
+  registerCompletionDiffHandler()
 
   // handles reconfigure workflow logic from frontend
   // validate the modify logic request and notifies the frontend
@@ -96,42 +79,77 @@ class ExecutionReconfigurationService(
 
   // actually performs all reconfiguration requests the user made during pause
   // sends ModifyLogic messages to operators and workers,
-  // there are two modes: transactional or non-transactional
-  // in the transactional mode, reconfigurations on multiple operators will be 
synchronized
-  // in the non-transaction mode, they are not synchronized, this is faster, 
but can lead to consistency issues
-  // for details, see the Fries reconfiguration paper
+  // see the Fries reconfiguration paper for the algorithm.
+  // Note: StateTransferFunc is currently not threaded through to the engine —
+  // the new UpdateExecutorRequest only carries (targetOpId, 
newOpExecInitInfo).
   def performReconfigurationOnResume(): Unit = {
     val reconfigurations = 
stateStore.reconfigurationStore.getState.unscheduledReconfigurations
     if (reconfigurations.isEmpty) {
       return
     }
-    throw new RuntimeException("reconfiguration is tentatively disabled.")
-    //    // schedule all pending reconfigurations to the engine
-    //    val reconfigurationId = UUID.randomUUID().toString
-    //    val modifyLogicReq = AmberModifyLogicRequest(reconfigurations.map {
-    //      case (op, stateTransferFunc) =>
-    //        val bytes = AmberRuntime.serde.serialize(op.opExecInitInfo).get
-    //        val protoAny = Any.of(
-    //          
"org.apache.texera.amber.engine.architecture.deploysemantics.layer.OpExecInitInfo",
-    //          ByteString.copyFrom(bytes)
-    //        )
-    //        val stateTransferFuncOpt = stateTransferFunc.map { func =>
-    //          val bytes = AmberRuntime.serde.serialize(func).get
-    //          Any.of(
-    //            
"org.apache.texera.workflow.common.operators.StateTransferFunc",
-    //            ByteString.copyFrom(bytes)
-    //          )
-    //        }
-    //        UpdateExecutorRequest(op.id, protoAny, stateTransferFuncOpt)
-    //    })
-    //    client.controllerInterface.reconfigureWorkflow(
-    //      WorkflowReconfigureRequest(modifyLogicReq, reconfigurationId),
-    //      ()
-    //    )
-    //
-    //    // clear all un-scheduled reconfigurations, start a new 
reconfiguration ID
-    //    stateStore.reconfigurationStore.updateState(_ =>
-    //      ExecutionReconfigurationStore(Some(reconfigurationId))
-    //    )
+
+    val reconfigurationId = UUID.randomUUID().toString
+    val updateExecutorRequests = reconfigurations.map {
+      case (op, _) => UpdateExecutorRequest(op.id, op.opExecInitInfo)
+    }
+    dispatch(
+      WorkflowReconfigureRequest(
+        reconfiguration = updateExecutorRequests,
+        reconfigurationId = reconfigurationId
+      )
+    )
+
+    // clear all un-scheduled reconfigurations, start a new reconfiguration ID
+    stateStore.reconfigurationStore.updateState(_ =>
+      ExecutionReconfigurationStore(currentReconfigId = 
Some(reconfigurationId))
+    )
+  }
+
+  // Seam for unit testing the dispatch path without spinning up an 
AmberClient.
+  protected def dispatch(request: WorkflowReconfigureRequest): Unit = {
+    client.controllerInterface.reconfigureWorkflow(request, ())
+  }
+
+  // Seam for unit testing — production wires the engine's 
UpdateExecutorCompleted
+  // events into the reconfiguration store so the diff handler above can fire
+  // ModifyLogicCompletedEvent for the frontend.
+  protected def registerWorkerCompletionCallback(): Unit = {
+    client.registerCallback[UpdateExecutorCompleted]((evt: 
UpdateExecutorCompleted) => {
+      onWorkerReconfigured(evt.id)
+    })
+  }
+
+  // Exposed (instead of inlined in the callback) so tests can drive the
+  // completion path directly.
+  private[service] def onWorkerReconfigured(worker: ActorVirtualIdentity): 
Unit = {
+    stateStore.reconfigurationStore.updateState(old =>
+      old.copy(completedReconfigurations = old.completedReconfigurations + 
worker)
+    )
+  }
+
+  // Seam for unit testing — the diff handler dereferences 
workflow.physicalPlan
+  // to map worker → logical op, which makes constructing a service in tests
+  // require a full Workflow. Tests override to no-op.
+  protected def registerCompletionDiffHandler(): Unit = {
+    addSubscription(
+      stateStore.reconfigurationStore.registerDiffHandler((oldState, newState) 
=> {
+        if (
+          oldState.completedReconfigurations != 
newState.completedReconfigurations
+          && oldState.currentReconfigId == newState.currentReconfigId
+        ) {
+          val diff = newState.completedReconfigurations -- 
oldState.completedReconfigurations
+          val newlyCompletedOps = diff
+            .map(workerId => 
workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id)
+            .map(opId => opId.logicalOpId.id)
+          if (newlyCompletedOps.nonEmpty) {
+            List(ModifyLogicCompletedEvent(newlyCompletedOps.toList))
+          } else {
+            List()
+          }
+        } else {
+          List()
+        }
+      })
+    )
   }
 }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 6f344caae3..92dfba19de 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -150,9 +150,15 @@ class ReconfigurationSpec
           completion.setDone()
         }
       })
-    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+    Await.result(
+      client.controllerInterface.startWorkflow(EmptyRequest(), ()),
+      Duration.fromSeconds(5)
+    )
     val pausedReached = stateReached(client, PAUSED)
-    Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
+    Await.result(
+      client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
+      Duration.fromSeconds(5)
+    )
     Await.result(pausedReached, Duration.fromSeconds(10))
     val physicalOps = targetOps.flatMap(op =>
       workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
@@ -164,9 +170,13 @@ class ReconfigurationSpec
           reconfigurationId = "test-reconfigure-1"
         ),
         ()
-      )
+      ),
+      Duration.fromSeconds(5)
+    )
+    Await.result(
+      client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
+      Duration.fromSeconds(5)
     )
-    Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
     Await.result(completion, Duration.fromMinutes(1))
     result
   }
diff --git 
a/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala
 
b/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala
new file mode 100644
index 0000000000..974db13286
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.service
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{
+  ActorVirtualIdentity,
+  ExecutionIdentity,
+  OperatorIdentity,
+  PhysicalOpIdentity,
+  WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.WorkflowReconfigureRequest
+import org.apache.texera.web.storage.{ExecutionReconfigurationStore, 
ExecutionStateStore}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Web-service-layer tests for ExecutionReconfigurationService.
+  *
+  * The end-to-end engine path (reconfigureWorkflow → Fries algorithm →
+  * UpdateExecutor on workers) is covered by ReconfigurationSpec.
+  * This spec focuses on the wiring inside performReconfigurationOnResume:
+  * empty short-circuit, request construction, and store reset semantics.
+  */
+class ExecutionReconfigurationServiceSpec extends AnyFlatSpec with Matchers {
+
+  private def mkPhysicalOp(name: String): PhysicalOp =
+    PhysicalOp(
+      id = PhysicalOpIdentity(OperatorIdentity(name), "main"),
+      workflowId = WorkflowIdentity(0L),
+      executionId = ExecutionIdentity(0L),
+      opExecInitInfo = OpExecWithClassName(s"$name.Class", "")
+    )
+
+  /** Service variant that records dispatched requests and skips the 
AmberClient
+    * registration / workflow-dependent diff handler so it can be constructed
+    * without a live engine.
+    */
+  private class RecordingService(stateStore: ExecutionStateStore)
+      extends ExecutionReconfigurationService(client = null, stateStore, 
workflow = null) {
+    val captured: ArrayBuffer[WorkflowReconfigureRequest] = ArrayBuffer.empty
+    override protected def dispatch(request: WorkflowReconfigureRequest): Unit 
=
+      captured += request
+    override protected def registerWorkerCompletionCallback(): Unit = ()
+    override protected def registerCompletionDiffHandler(): Unit = ()
+  }
+
+  "performReconfigurationOnResume" should
+    "return without dispatching when no reconfigurations are pending" in {
+    val stateStore = new ExecutionStateStore()
+    val service = new RecordingService(stateStore)
+
+    noException should be thrownBy service.performReconfigurationOnResume()
+
+    service.captured shouldBe empty
+    val state = stateStore.reconfigurationStore.getState
+    state.unscheduledReconfigurations shouldBe empty
+    state.currentReconfigId shouldBe None
+    state.completedReconfigurations shouldBe empty
+  }
+
+  it should "dispatch one request carrying every pending reconfiguration and 
reset the store" in {
+    val stateStore = new ExecutionStateStore()
+    val service = new RecordingService(stateStore)
+
+    val op1 = mkPhysicalOp("op-1")
+    val op2 = mkPhysicalOp("op-2")
+    stateStore.reconfigurationStore.updateState(_ =>
+      ExecutionReconfigurationStore(unscheduledReconfigurations = List((op1, 
None), (op2, None)))
+    )
+
+    service.performReconfigurationOnResume()
+
+    service.captured should have size 1
+    val request = service.captured.head
+    request.reconfigurationId should not be empty
+    request.reconfiguration.map(_.targetOpId) should contain 
theSameElementsInOrderAs Seq(
+      op1.id,
+      op2.id
+    )
+    request.reconfiguration.map(_.newExecInitInfo) should contain 
theSameElementsInOrderAs Seq(
+      op1.opExecInitInfo,
+      op2.opExecInitInfo
+    )
+
+    val state = stateStore.reconfigurationStore.getState
+    state.unscheduledReconfigurations shouldBe empty
+    state.currentReconfigId shouldBe Some(request.reconfigurationId)
+    state.completedReconfigurations shouldBe empty
+  }
+
+  it should "use a fresh reconfigurationId on each dispatch" in {
+    val stateStore = new ExecutionStateStore()
+    val service = new RecordingService(stateStore)
+
+    def queueAndDispatch(opName: String): String = {
+      stateStore.reconfigurationStore.updateState(old =>
+        old.copy(unscheduledReconfigurations = List((mkPhysicalOp(opName), 
None)))
+      )
+      service.performReconfigurationOnResume()
+      service.captured.last.reconfigurationId
+    }
+
+    val firstId = queueAndDispatch("op-a")
+    val secondId = queueAndDispatch("op-b")
+
+    firstId should not be secondId
+    stateStore.reconfigurationStore.getState.currentReconfigId shouldBe 
Some(secondId)
+  }
+
+  "onWorkerReconfigured" should
+    "add the worker id to completedReconfigurations so the diff handler can 
fire" in {
+    val stateStore = new ExecutionStateStore()
+    val service = new RecordingService(stateStore)
+
+    val w1 = ActorVirtualIdentity("Worker:WF1-E1-op-main-0")
+    val w2 = ActorVirtualIdentity("Worker:WF1-E1-op-main-1")
+    service.onWorkerReconfigured(w1)
+    service.onWorkerReconfigured(w2)
+    // duplicate completion is idempotent (Set semantics).
+    service.onWorkerReconfigured(w1)
+
+    stateStore.reconfigurationStore.getState.completedReconfigurations should 
contain theSameElementsAs Set(
+      w1,
+      w2
+    )
+  }
+}

Reply via email to