This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 05b271c5cc feat(engine): add support for restarting regions (#4441)
05b271c5cc is described below
commit 05b271c5ccf1aff4eaa806840ed7ef3491da2f80
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 27 10:51:57 2026 -0700
feat(engine): add support for restarting regions (#4441)
---
.../architecture/common/AmberProcessor.scala | 5 ++-
.../controller/execution/WorkflowExecution.scala | 14 ++++++++
.../messaginglayer/NetworkInputGateway.scala | 6 ++++
.../messaginglayer/NetworkOutputGateway.scala | 6 ++++
.../scheduling/RegionExecutionCoordinator.scala | 17 ++++++---
.../scheduling/WorkflowExecutionCoordinator.scala | 8 ++++-
.../amber/engine/common/rpc/AsyncRPCClient.scala | 8 +++--
.../control/utils/TrivialControlTester.scala | 14 +++++---
.../messaginglayer/NetworkInputGatewaySpec.scala | 12 +++++++
.../workflow/WorkflowExecutionsResourceSpec.scala | 40 ++++++++++++++++++++++
10 files changed, 115 insertions(+), 15 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
index e776307323..f1c8136fd8 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
@@ -21,7 +21,6 @@ package org.apache.texera.amber.engine.architecture.common
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
import org.apache.texera.amber.engine.architecture.messaginglayer.{
- InputGateway,
NetworkInputGateway,
NetworkOutputGateway
}
@@ -43,7 +42,7 @@ abstract class AmberProcessor(
with Serializable {
/** FIFO & exactly once */
- val inputGateway: InputGateway = new NetworkInputGateway(this.actorId)
+ val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId)
// 1. Unified Output
val outputGateway: NetworkOutputGateway =
@@ -55,7 +54,7 @@ abstract class AmberProcessor(
}
)
// 2. RPC Layer
- val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId)
+ val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId)
val asyncRPCServer: AsyncRPCServer =
new AsyncRPCServer(outputGateway, actorId)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index b806479b89..c1e44bd5cc 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -52,6 +52,18 @@ case class WorkflowExecution() {
regionExecutions.getOrElseUpdate(region.id, RegionExecution(region))
}
+ def restartRegionExecution(region: Region): RegionExecution = {
+ regionExecutions.get(region.id).foreach { existingRegionExecution =>
+ assert(
+ existingRegionExecution.isCompleted,
+ s"Cannot restart running RegionExecution of ${region.id}."
+ )
+ }
+ val regionExecution = RegionExecution(region)
+ regionExecutions.put(region.id, regionExecution)
+ regionExecution
+ }
+
/**
* Retrieves a specific `RegionExecution` by its identifier.
*
@@ -60,6 +72,8 @@ case class WorkflowExecution() {
*/
def getRegionExecution(regionId: RegionIdentity): RegionExecution =
regionExecutions(regionId)
+ def hasRegionExecution(regionId: RegionIdentity): Boolean =
regionExecutions.contains(regionId)
+
/**
* Retrieves all `RegionExecutions` that are currently in running state,
* preserving the order in which they were created.
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
index 5cfd8aabc0..aed5c36c4a 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
@@ -86,4 +86,10 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity)
enforcers += enforcer
}
+ def removeControlChannel(from: ActorVirtualIdentity): Unit = {
+ synchronized {
+ inputChannels.remove(ChannelIdentity(from, actorId, isControl = true))
+ }
+ }
+
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
index 929a30f4ef..ea7034e1d7 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
@@ -94,4 +94,10 @@ class NetworkOutputGateway(
idToSequenceNums.getOrElseUpdate(channelId, new
AtomicLong()).getAndIncrement()
}
+ def removeControlChannel(to: ActorVirtualIdentity): Unit = {
+ synchronized {
+ idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true))
+ }
+ }
+
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index e490cde3d9..db9c583a68 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -91,6 +91,7 @@ import scala.concurrent.duration.Duration
*/
class RegionExecutionCoordinator(
region: Region,
+ isRestart: Boolean,
workflowExecution: WorkflowExecution,
asyncRPCClient: AsyncRPCClient,
controllerConfig: ControllerConfig,
@@ -167,6 +168,10 @@ class RegionExecutionCoordinator(
val actorRef = actorRefService.getActorRef(workerId)
// Remove the actorRef so that no other actors can find the
worker and send messages.
actorRefService.removeActorRef(workerId)
+ // Restarted regions reuse actorId. Remove stale control
channels so the
+ // controller does not reuse old control-message sequence
numbers for new workers.
+ asyncRPCClient.inputGateway.removeControlChannel(workerId)
+ asyncRPCClient.outputGateway.removeControlChannel(workerId)
gracefulStop(actorRef, Duration(5,
TimeUnit.SECONDS)).asTwitter()
}
}.toSeq
@@ -534,11 +539,13 @@ class RegionExecutionCoordinator(
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is
missing"))
DocumentFactory.createDocument(storageUriToAdd, schema)
- WorkflowExecutionsResource.insertOperatorPortResultUri(
- eid = eid,
- globalPortId = outputPortId,
- uri = storageUriToAdd
- )
+ if (!isRestart) {
+ WorkflowExecutionsResource.insertOperatorPortResultUri(
+ eid = eid,
+ globalPortId = outputPortId,
+ uri = storageUriToAdd
+ )
+ }
}
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 05585f88d8..5a9c84c701 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -85,9 +85,15 @@ class WorkflowExecutionCoordinator(
executedRegions.append(nextRegions)
nextRegions
.map(region => {
- workflowExecution.initRegionExecution(region)
+ val isRestart = workflowExecution.hasRegionExecution(region.id)
+ if (isRestart) {
+ workflowExecution.restartRegionExecution(region)
+ } else {
+ workflowExecution.initRegionExecution(region)
+ }
regionExecutionCoordinators(region.id) = new
RegionExecutionCoordinator(
region,
+ isRestart,
workflowExecution,
asyncRPCClient,
controllerConfig,
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
index 704ebd7f47..f7e26803b4 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
@@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{
EmbeddedControlMessageIdentity
}
import org.apache.texera.amber.engine.architecture.controller.ClientEvent
-import
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+ NetworkInputGateway,
+ NetworkOutputGateway
+}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
import
org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
@@ -125,7 +128,8 @@ object AsyncRPCClient {
}
class AsyncRPCClient(
- outputGateway: NetworkOutputGateway,
+ val inputGateway: NetworkInputGateway,
+ val outputGateway: NetworkOutputGateway,
val actorId: ActorVirtualIdentity
) extends AmberLogging {
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
index e236ac760b..ca86338d36 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
@@ -24,7 +24,10 @@ import
org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, Chann
import
org.apache.texera.amber.engine.architecture.common.WorkflowActor.NetworkAck
import org.apache.texera.amber.engine.architecture.common.{AmberProcessor,
WorkflowActor}
import
org.apache.texera.amber.engine.architecture.control.utils.TrivialControlTester.ControlTesterRPCClient
-import
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+ NetworkInputGateway,
+ NetworkOutputGateway
+}
import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.AsyncRPCContext
import
org.apache.texera.amber.engine.architecture.rpc.testerservice.RPCTesterFs2Grpc
import org.apache.texera.amber.engine.common.CheckpointState
@@ -37,8 +40,11 @@ import org.apache.texera.amber.engine.common.ambermessage.{
import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
object TrivialControlTester {
- class ControlTesterRPCClient(outputGateway: NetworkOutputGateway, actorId:
ActorVirtualIdentity)
- extends AsyncRPCClient(outputGateway, actorId) {
+ class ControlTesterRPCClient(
+ inputGateway: NetworkInputGateway,
+ outputGateway: NetworkOutputGateway,
+ actorId: ActorVirtualIdentity
+ ) extends AsyncRPCClient(inputGateway, outputGateway, actorId) {
val getProxy: RPCTesterFs2Grpc[Future, AsyncRPCContext] =
AsyncRPCClient
.createProxy[RPCTesterFs2Grpc[Future, AsyncRPCContext]](createPromise,
outputGateway)
@@ -55,7 +61,7 @@ class TrivialControlTester(
case Right(value) => transferService.send(value)
}
) {
- override val asyncRPCClient = new ControlTesterRPCClient(outputGateway, id)
+ override val asyncRPCClient = new ControlTesterRPCClient(inputGateway,
outputGateway, id)
}
val initializer =
new TesterAsyncRPCHandlerInitializer(ap.actorId, ap.asyncRPCClient,
ap.asyncRPCServer)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
index 1679144354..04f4f00045 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
@@ -84,4 +84,16 @@ class NetworkInputGatewaySpec extends AnyFlatSpec with
MockFactory {
}
+ "network input port" should "remove control channel by sender" in {
+ val inputPort = new NetworkInputGateway(fakeReceiverID)
+ val controlChannelId = ChannelIdentity(fakeSenderID, fakeReceiverID,
isControl = true)
+ inputPort.getChannel(controlChannelId)
+
+ assert(inputPort.getAllControlChannels.size == 1)
+
+ inputPort.removeControlChannel(fakeSenderID)
+
+ assert(inputPort.getAllControlChannels.isEmpty)
+ }
+
}
diff --git
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
index 1bd7fcdf6c..bd55124a72 100644
---
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
@@ -19,6 +19,13 @@
package org.apache.texera.web.resource.dashboard.user.workflow
+import org.apache.texera.amber.core.virtualidentity.{
+ ExecutionIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity
+}
+import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
import org.apache.texera.dao.MockTexeraDB
import org.apache.texera.dao.jooq.generated.Tables._
import org.apache.texera.dao.jooq.generated.tables.daos.{
@@ -36,6 +43,7 @@ import org.apache.texera.dao.jooq.generated.tables.pojos.{
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach,
PrivateMethodTester}
+import java.net.URI
import java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.TimeUnit
@@ -176,4 +184,36 @@ class WorkflowExecutionsResourceSpec
)
}
+ "WorkflowExecutionsResource.insertOperatorPortResultUri" should "insert a
result URI row" in {
+ val execution = new WorkflowExecutions
+ execution.setVid(testVersion.getVid)
+ execution.setUid(testUser.getUid)
+ execution.setStatus(0.toByte)
+ execution.setResult("")
+ execution.setStartingTime(new Timestamp(System.currentTimeMillis()))
+ execution.setBookmarked(false)
+ execution.setName("Execution with duplicate result URI insert")
+ execution.setEnvironmentVersion("test-env-1.0")
+ workflowExecutionsDao.insert(execution)
+
+ val executionId = ExecutionIdentity(execution.getEid.longValue())
+ val globalPortId = GlobalPortIdentity(
+ PhysicalOpIdentity(OperatorIdentity("operator-1"), "main"),
+ PortIdentity(),
+ input = false
+ )
+ val uri = URI.create("vfs:///test-result")
+
+ WorkflowExecutionsResource.insertOperatorPortResultUri(executionId,
globalPortId, uri)
+
+ val rows = getDSLContext
+ .selectFrom(OPERATOR_PORT_EXECUTIONS)
+
.where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid))
+
.and(OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString))
+ .fetch()
+
+ assert(rows.size() == 1)
+ assert(rows.get(0).getResultUri == uri.toString)
+ }
+
}