This is an automated email from the ASF dual-hosted git repository. Yicong-Huang pushed a commit to branch revert-4531-fix/reconfiguration-rpc-wiring in repository https://gitbox.apache.org/repos/asf/texera.git
commit 7ad2b600ab6ec974ec17edb2acc712c747e090d6 Author: Yicong Huang <[email protected]> AuthorDate: Mon Apr 27 23:10:52 2026 -0700 Revert "fix(amber): wire ExecutionReconfigurationService back to the engine (…" This reverts commit 23901be24b23191496cd6c8110380a74eaf61de3. --- .../core/architecture/rpc/async_rpc_server.py | 21 +-- .../promisehandlers/ReconfigurationHandler.scala | 47 +++---- .../engine/architecture/worker/DataProcessor.scala | 11 +- .../service/ExecutionReconfigurationService.scala | 136 ++++++++----------- .../amber/engine/e2e/ReconfigurationSpec.scala | 18 +-- .../ExecutionReconfigurationServiceSpec.scala | 149 --------------------- 6 files changed, 83 insertions(+), 299 deletions(-) diff --git a/amber/src/main/python/core/architecture/rpc/async_rpc_server.py b/amber/src/main/python/core/architecture/rpc/async_rpc_server.py index 49dc5f0547..d776307030 100644 --- a/amber/src/main/python/core/architecture/rpc/async_rpc_server.py +++ b/amber/src/main/python/core/architecture/rpc/async_rpc_server.py @@ -121,23 +121,10 @@ class AsyncRPCServer: if self._no_reply_needed(control_invocation.command_id): return - # Reply to the actor that originated this ControlInvocation, identified - # by control_invocation.context.sender. For a normal RPC over a - # control channel this matches `from_.from_worker_id`; for an - # invocation carried in-band by an ECM along a data channel, `from_` - # is the data channel between two workers and the original sender - # lives only in the invocation's context. - # When the context is unset (e.g. unit-test inputs that construct - # ControlInvocation directly), fall back to swapping `from_`. - ctx = control_invocation.context - if ctx.sender.name and ctx.receiver.name: - target_channel_id = ChannelIdentity( - ctx.receiver, ctx.sender, is_control=True - ) - else: - target_channel_id = ChannelIdentity( - from_.to_worker_id, from_.from_worker_id, is_control=True - ) + # Reply to the sender. + target_channel_id = ChannelIdentity( + from_.to_worker_id, from_.from_worker_id, True + ) logger.debug( f"PYTHON returns a ReturnInvocation {payload}, replying the command" f" {command}" diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala index 7653f873c1..210d7c5b98 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala @@ -21,21 +21,16 @@ package org.apache.texera.amber.engine.architecture.controller.promisehandlers import com.twitter.util.Future import org.apache.texera.amber.core.virtualidentity.{ - ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity } -import org.apache.texera.amber.engine.architecture.controller.{ - ControllerAsyncRPCHandlerInitializer, - UpdateExecutorCompleted -} +import org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ AsyncRPCContext, - ControlInvocation, WorkflowReconfigureRequest } -import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ControlReturn, EmptyReturn} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.texera.amber.util.VirtualIdentityUtils @@ -69,12 +64,7 @@ trait ReconfigurationHandler { .getLatestOperatorExecution(updateExecutorRequest.targetOpId) .getWorkerIds workerIds.foreach { worker => - futures.append( - notifyOnComplete( - workerInterface.updateExecutor(updateExecutorRequest, mkContext(worker)), - worker - ) - ) + futures.append(workerInterface.updateExecutor(updateExecutorRequest, mkContext(worker))) } } else { val channelScope = cp.workflowExecution.getRunningRegionExecutions @@ -98,21 +88,20 @@ trait ReconfigurationHandler { } } val finalScope = channelScope ++ controlChannels - val workerCommands: Seq[(ActorVirtualIdentity, ControlInvocation, Future[ControlReturn])] = + val cmdMapping = friesComponent.reconfigurations.flatMap { updateReq => val workers = cp.workflowExecution.getLatestOperatorExecution(updateReq.targetOpId).getWorkerIds - workers.map { worker => - val (invocation, future) = - createInvocation(METHOD_UPDATE_EXECUTOR.getBareMethodName, updateReq, worker) - (worker, invocation, future) - } - }.toSeq - val cmdMapping: Map[String, ControlInvocation] = workerCommands.map { - case (worker, invocation, _) => worker.name -> invocation - }.toMap - futures ++= workerCommands.map { - case (worker, _, future) => notifyOnComplete(future, worker) + workers.map(worker => + worker.name -> createInvocation( + METHOD_UPDATE_EXECUTOR.getBareMethodName, + updateReq, + worker + ) + ) + }.toMap + futures += cmdMapping.map { + case (_, (_, singleWorkerUpdateFuture)) => singleWorkerUpdateFuture } friesComponent.sources.foreach { source => cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach { worker => @@ -120,7 +109,7 @@ trait ReconfigurationHandler { EmbeddedControlMessageIdentity(msg.reconfigurationId), ALL_ALIGNMENT, finalScope.toSet, - cmdMapping, + cmdMapping.map(x => (x._1, x._2._1)), ChannelIdentity(actorId, worker, isControl = true) ) } @@ -132,10 +121,4 @@ trait ReconfigurationHandler { } } - // After a worker's updateExecutor completes, notify the client so the - // ExecutionReconfigurationService can advance completedReconfigurations - // and emit ModifyLogicCompletedEvent on the websocket. - private def notifyOnComplete[T](future: Future[T], worker: ActorVirtualIdentity): Future[T] = - future.onSuccess(_ => sendToClient(UpdateExecutorCompleted(worker))) - } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 84f1e8ec65..3aa5fa90a4 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -242,16 +242,7 @@ class DataProcessor( // invoke the control command carried with the ECM logger.info(s"process ECM from $channelId, id = ${ecm.id}, cmd = $command") if (command.isDefined) { - // The reply must go back to the actor that originated the invocation - // (recorded in command.context.sender), not to channelId.fromWorkerId. - // For ECM-embedded commands those differ: channelId is the data - // channel between two workers, while the originator is typically the - // controller. Fall back to the channel sender when the context is - // unset (e.g. unit-test inputs). - val ctx = command.get.context - val replyTo = - if (ctx.sender.name.nonEmpty) ctx.sender else channelId.fromWorkerId - asyncRPCServer.receive(command.get, replyTo) + asyncRPCServer.receive(command.get, channelId.fromWorkerId) } // if this worker is not the final destination of the ECM, pass it downstream val downstreamChannelsInScope = ecm.scope.filter(_.fromWorkerId == actorId).toSet diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala index e7617fdfe1..e5867277fc 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala @@ -19,12 +19,7 @@ package org.apache.texera.web.service -import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity -import org.apache.texera.amber.engine.architecture.controller.{UpdateExecutorCompleted, Workflow} -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ - UpdateExecutorRequest, - WorkflowReconfigureRequest -} +import org.apache.texera.amber.engine.architecture.controller.Workflow import org.apache.texera.amber.engine.common.client.AmberClient import org.apache.texera.web.SubscriptionManager import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent @@ -33,9 +28,8 @@ import org.apache.texera.web.model.websocket.response.{ ModifyLogicCompletedEvent, ModifyLogicResponse } -import org.apache.texera.web.storage.{ExecutionReconfigurationStore, ExecutionStateStore} +import org.apache.texera.web.storage.ExecutionStateStore -import java.util.UUID import scala.util.{Failure, Success} class ExecutionReconfigurationService( @@ -45,11 +39,34 @@ class ExecutionReconfigurationService( ) extends SubscriptionManager { // monitors notification from the engine that a reconfiguration on a worker is completed - registerWorkerCompletionCallback() + // client.registerCallback[UpdateExecutorCompleted]((evt: UpdateExecutorCompleted) => { + // stateStore.reconfigurationStore.updateState(old => { + // old.copy(completedReconfigurations = old.completedReconfigurations + evt.id) + // }) + // }) // monitors the reconfiguration state (completed workers) change, // notifies the frontend when all workers of an operator complete reconfiguration - registerCompletionDiffHandler() + addSubscription( + stateStore.reconfigurationStore.registerDiffHandler((oldState, newState) => { + if ( + oldState.completedReconfigurations != newState.completedReconfigurations + && oldState.currentReconfigId == newState.currentReconfigId + ) { + val diff = newState.completedReconfigurations -- oldState.completedReconfigurations + val newlyCompletedOps = diff + .map(workerId => workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id) + .map(opId => opId.logicalOpId.id) + if (newlyCompletedOps.nonEmpty) { + List(ModifyLogicCompletedEvent(newlyCompletedOps.toList)) + } else { + List() + } + } else { + List() + } + }) + ) // handles reconfigure workflow logic from frontend // validate the modify logic request and notifies the frontend @@ -79,77 +96,42 @@ class ExecutionReconfigurationService( // actually performs all reconfiguration requests the user made during pause // sends ModifyLogic messages to operators and workers, - // see the Fries reconfiguration paper for the algorithm. - // Note: StateTransferFunc is currently not threaded through to the engine — - // the new UpdateExecutorRequest only carries (targetOpId, newOpExecInitInfo). + // there are two modes: transactional or non-transactional + // in the transactional mode, reconfigurations on multiple operators will be synchronized + // in the non-transaction mode, they are not synchronized, this is faster, but can lead to consistency issues + // for details, see the Fries reconfiguration paper def performReconfigurationOnResume(): Unit = { val reconfigurations = stateStore.reconfigurationStore.getState.unscheduledReconfigurations if (reconfigurations.isEmpty) { return } - - val reconfigurationId = UUID.randomUUID().toString - val updateExecutorRequests = reconfigurations.map { - case (op, _) => UpdateExecutorRequest(op.id, op.opExecInitInfo) - } - dispatch( - WorkflowReconfigureRequest( - reconfiguration = updateExecutorRequests, - reconfigurationId = reconfigurationId - ) - ) - - // clear all un-scheduled reconfigurations, start a new reconfiguration ID - stateStore.reconfigurationStore.updateState(_ => - ExecutionReconfigurationStore(currentReconfigId = Some(reconfigurationId)) - ) - } - - // Seam for unit testing the dispatch path without spinning up an AmberClient. - protected def dispatch(request: WorkflowReconfigureRequest): Unit = { - client.controllerInterface.reconfigureWorkflow(request, ()) - } - - // Seam for unit testing — production wires the engine's UpdateExecutorCompleted - // events into the reconfiguration store so the diff handler above can fire - // ModifyLogicCompletedEvent for the frontend. - protected def registerWorkerCompletionCallback(): Unit = { - client.registerCallback[UpdateExecutorCompleted]((evt: UpdateExecutorCompleted) => { - onWorkerReconfigured(evt.id) - }) - } - - // Exposed (instead of inlined in the callback) so tests can drive the - // completion path directly. - private[service] def onWorkerReconfigured(worker: ActorVirtualIdentity): Unit = { - stateStore.reconfigurationStore.updateState(old => - old.copy(completedReconfigurations = old.completedReconfigurations + worker) - ) - } - - // Seam for unit testing — the diff handler dereferences workflow.physicalPlan - // to map worker → logical op, which makes constructing a service in tests - // require a full Workflow. Tests override to no-op. - protected def registerCompletionDiffHandler(): Unit = { - addSubscription( - stateStore.reconfigurationStore.registerDiffHandler((oldState, newState) => { - if ( - oldState.completedReconfigurations != newState.completedReconfigurations - && oldState.currentReconfigId == newState.currentReconfigId - ) { - val diff = newState.completedReconfigurations -- oldState.completedReconfigurations - val newlyCompletedOps = diff - .map(workerId => workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id) - .map(opId => opId.logicalOpId.id) - if (newlyCompletedOps.nonEmpty) { - List(ModifyLogicCompletedEvent(newlyCompletedOps.toList)) - } else { - List() - } - } else { - List() - } - }) - ) + throw new RuntimeException("reconfiguration is tentatively disabled.") + // // schedule all pending reconfigurations to the engine + // val reconfigurationId = UUID.randomUUID().toString + // val modifyLogicReq = AmberModifyLogicRequest(reconfigurations.map { + // case (op, stateTransferFunc) => + // val bytes = AmberRuntime.serde.serialize(op.opExecInitInfo).get + // val protoAny = Any.of( + // "org.apache.texera.amber.engine.architecture.deploysemantics.layer.OpExecInitInfo", + // ByteString.copyFrom(bytes) + // ) + // val stateTransferFuncOpt = stateTransferFunc.map { func => + // val bytes = AmberRuntime.serde.serialize(func).get + // Any.of( + // "org.apache.texera.workflow.common.operators.StateTransferFunc", + // ByteString.copyFrom(bytes) + // ) + // } + // UpdateExecutorRequest(op.id, protoAny, stateTransferFuncOpt) + // }) + // client.controllerInterface.reconfigureWorkflow( + // WorkflowReconfigureRequest(modifyLogicReq, reconfigurationId), + // () + // ) + // + // // clear all un-scheduled reconfigurations, start a new reconfiguration ID + // stateStore.reconfigurationStore.updateState(_ => + // ExecutionReconfigurationStore(Some(reconfigurationId)) + // ) } } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala index 92dfba19de..6f344caae3 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala @@ -150,15 +150,9 @@ class ReconfigurationSpec completion.setDone() } }) - Await.result( - client.controllerInterface.startWorkflow(EmptyRequest(), ()), - Duration.fromSeconds(5) - ) + Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ())) val pausedReached = stateReached(client, PAUSED) - Await.result( - client.controllerInterface.pauseWorkflow(EmptyRequest(), ()), - Duration.fromSeconds(5) - ) + Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ())) Await.result(pausedReached, Duration.fromSeconds(10)) val physicalOps = targetOps.flatMap(op => workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier) @@ -170,13 +164,9 @@ class ReconfigurationSpec reconfigurationId = "test-reconfigure-1" ), () - ), - Duration.fromSeconds(5) - ) - Await.result( - client.controllerInterface.resumeWorkflow(EmptyRequest(), ()), - Duration.fromSeconds(5) + ) ) + Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ())) Await.result(completion, Duration.fromMinutes(1)) result } diff --git a/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala b/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala deleted file mode 100644 index 974db13286..0000000000 --- a/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.texera.web.service - -import org.apache.texera.amber.core.executor.OpExecWithClassName -import org.apache.texera.amber.core.virtualidentity.{ - ActorVirtualIdentity, - ExecutionIdentity, - OperatorIdentity, - PhysicalOpIdentity, - WorkflowIdentity -} -import org.apache.texera.amber.core.workflow.PhysicalOp -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.WorkflowReconfigureRequest -import org.apache.texera.web.storage.{ExecutionReconfigurationStore, ExecutionStateStore} -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -import scala.collection.mutable.ArrayBuffer - -/** - * Web-service-layer tests for ExecutionReconfigurationService. - * - * The end-to-end engine path (reconfigureWorkflow → Fries algorithm → - * UpdateExecutor on workers) is covered by ReconfigurationSpec. - * This spec focuses on the wiring inside performReconfigurationOnResume: - * empty short-circuit, request construction, and store reset semantics. - */ -class ExecutionReconfigurationServiceSpec extends AnyFlatSpec with Matchers { - - private def mkPhysicalOp(name: String): PhysicalOp = - PhysicalOp( - id = PhysicalOpIdentity(OperatorIdentity(name), "main"), - workflowId = WorkflowIdentity(0L), - executionId = ExecutionIdentity(0L), - opExecInitInfo = OpExecWithClassName(s"$name.Class", "") - ) - - /** Service variant that records dispatched requests and skips the AmberClient - * registration / workflow-dependent diff handler so it can be constructed - * without a live engine. - */ - private class RecordingService(stateStore: ExecutionStateStore) - extends ExecutionReconfigurationService(client = null, stateStore, workflow = null) { - val captured: ArrayBuffer[WorkflowReconfigureRequest] = ArrayBuffer.empty - override protected def dispatch(request: WorkflowReconfigureRequest): Unit = - captured += request - override protected def registerWorkerCompletionCallback(): Unit = () - override protected def registerCompletionDiffHandler(): Unit = () - } - - "performReconfigurationOnResume" should - "return without dispatching when no reconfigurations are pending" in { - val stateStore = new ExecutionStateStore() - val service = new RecordingService(stateStore) - - noException should be thrownBy service.performReconfigurationOnResume() - - service.captured shouldBe empty - val state = stateStore.reconfigurationStore.getState - state.unscheduledReconfigurations shouldBe empty - state.currentReconfigId shouldBe None - state.completedReconfigurations shouldBe empty - } - - it should "dispatch one request carrying every pending reconfiguration and reset the store" in { - val stateStore = new ExecutionStateStore() - val service = new RecordingService(stateStore) - - val op1 = mkPhysicalOp("op-1") - val op2 = mkPhysicalOp("op-2") - stateStore.reconfigurationStore.updateState(_ => - ExecutionReconfigurationStore(unscheduledReconfigurations = List((op1, None), (op2, None))) - ) - - service.performReconfigurationOnResume() - - service.captured should have size 1 - val request = service.captured.head - request.reconfigurationId should not be empty - request.reconfiguration.map(_.targetOpId) should contain theSameElementsInOrderAs Seq( - op1.id, - op2.id - ) - request.reconfiguration.map(_.newExecInitInfo) should contain theSameElementsInOrderAs Seq( - op1.opExecInitInfo, - op2.opExecInitInfo - ) - - val state = stateStore.reconfigurationStore.getState - state.unscheduledReconfigurations shouldBe empty - state.currentReconfigId shouldBe Some(request.reconfigurationId) - state.completedReconfigurations shouldBe empty - } - - it should "use a fresh reconfigurationId on each dispatch" in { - val stateStore = new ExecutionStateStore() - val service = new RecordingService(stateStore) - - def queueAndDispatch(opName: String): String = { - stateStore.reconfigurationStore.updateState(old => - old.copy(unscheduledReconfigurations = List((mkPhysicalOp(opName), None))) - ) - service.performReconfigurationOnResume() - service.captured.last.reconfigurationId - } - - val firstId = queueAndDispatch("op-a") - val secondId = queueAndDispatch("op-b") - - firstId should not be secondId - stateStore.reconfigurationStore.getState.currentReconfigId shouldBe Some(secondId) - } - - "onWorkerReconfigured" should - "add the worker id to completedReconfigurations so the diff handler can fire" in { - val stateStore = new ExecutionStateStore() - val service = new RecordingService(stateStore) - - val w1 = ActorVirtualIdentity("Worker:WF1-E1-op-main-0") - val w2 = ActorVirtualIdentity("Worker:WF1-E1-op-main-1") - service.onWorkerReconfigured(w1) - service.onWorkerReconfigured(w2) - // duplicate completion is idempotent (Set semantics). - service.onWorkerReconfigured(w1) - - stateStore.reconfigurationStore.getState.completedReconfigurations should contain theSameElementsAs Set( - w1, - w2 - ) - } -}
