This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 05b271c5cc feat(engine): add support for restarting regions (#4441)
05b271c5cc is described below

commit 05b271c5ccf1aff4eaa806840ed7ef3491da2f80
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 27 10:51:57 2026 -0700

    feat(engine): add support for restarting regions (#4441)
---
 .../architecture/common/AmberProcessor.scala       |  5 ++-
 .../controller/execution/WorkflowExecution.scala   | 14 ++++++++
 .../messaginglayer/NetworkInputGateway.scala       |  6 ++++
 .../messaginglayer/NetworkOutputGateway.scala      |  6 ++++
 .../scheduling/RegionExecutionCoordinator.scala    | 17 ++++++---
 .../scheduling/WorkflowExecutionCoordinator.scala  |  8 ++++-
 .../amber/engine/common/rpc/AsyncRPCClient.scala   |  8 +++--
 .../control/utils/TrivialControlTester.scala       | 14 +++++---
 .../messaginglayer/NetworkInputGatewaySpec.scala   | 12 +++++++
 .../workflow/WorkflowExecutionsResourceSpec.scala  | 40 ++++++++++++++++++++++
 10 files changed, 115 insertions(+), 15 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
index e776307323..f1c8136fd8 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
@@ -21,7 +21,6 @@ package org.apache.texera.amber.engine.architecture.common
 
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
 import org.apache.texera.amber.engine.architecture.messaginglayer.{
-  InputGateway,
   NetworkInputGateway,
   NetworkOutputGateway
 }
@@ -43,7 +42,7 @@ abstract class AmberProcessor(
     with Serializable {
 
   /** FIFO & exactly once */
-  val inputGateway: InputGateway = new NetworkInputGateway(this.actorId)
+  val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId)
 
   // 1. Unified Output
   val outputGateway: NetworkOutputGateway =
@@ -55,7 +54,7 @@ abstract class AmberProcessor(
       }
     )
   // 2. RPC Layer
-  val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId)
+  val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId)
   val asyncRPCServer: AsyncRPCServer =
     new AsyncRPCServer(outputGateway, actorId)
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index b806479b89..c1e44bd5cc 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -52,6 +52,18 @@ case class WorkflowExecution() {
     regionExecutions.getOrElseUpdate(region.id, RegionExecution(region))
   }
 
+  def restartRegionExecution(region: Region): RegionExecution = {
+    regionExecutions.get(region.id).foreach { existingRegionExecution =>
+      assert(
+        existingRegionExecution.isCompleted,
+        s"Cannot restart running RegionExecution of ${region.id}."
+      )
+    }
+    val regionExecution = RegionExecution(region)
+    regionExecutions.put(region.id, regionExecution)
+    regionExecution
+  }
+
   /**
     * Retrieves a specific `RegionExecution` by its identifier.
     *
@@ -60,6 +72,8 @@ case class WorkflowExecution() {
     */
   def getRegionExecution(regionId: RegionIdentity): RegionExecution = 
regionExecutions(regionId)
 
+  def hasRegionExecution(regionId: RegionIdentity): Boolean = 
regionExecutions.contains(regionId)
+
   /**
     * Retrieves all `RegionExecutions` that are currently in running state,
     * preserving the order in which they were created.
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
index 5cfd8aabc0..aed5c36c4a 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
@@ -86,4 +86,10 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity)
     enforcers += enforcer
   }
 
+  def removeControlChannel(from: ActorVirtualIdentity): Unit = {
+    synchronized {
+      inputChannels.remove(ChannelIdentity(from, actorId, isControl = true))
+    }
+  }
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
index 929a30f4ef..ea7034e1d7 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
@@ -94,4 +94,10 @@ class NetworkOutputGateway(
     idToSequenceNums.getOrElseUpdate(channelId, new 
AtomicLong()).getAndIncrement()
   }
 
+  def removeControlChannel(to: ActorVirtualIdentity): Unit = {
+    synchronized {
+      idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true))
+    }
+  }
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index e490cde3d9..db9c583a68 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -91,6 +91,7 @@ import scala.concurrent.duration.Duration
   */
 class RegionExecutionCoordinator(
     region: Region,
+    isRestart: Boolean,
     workflowExecution: WorkflowExecution,
     asyncRPCClient: AsyncRPCClient,
     controllerConfig: ControllerConfig,
@@ -167,6 +168,10 @@ class RegionExecutionCoordinator(
                 val actorRef = actorRefService.getActorRef(workerId)
                 // Remove the actorRef so that no other actors can find the 
worker and send messages.
                 actorRefService.removeActorRef(workerId)
+                // Restarted regions reuse actorId. Remove stale control 
channels so the
+                // controller does not reuse old control-message sequence 
numbers for new workers.
+                asyncRPCClient.inputGateway.removeControlChannel(workerId)
+                asyncRPCClient.outputGateway.removeControlChannel(workerId)
                 gracefulStop(actorRef, Duration(5, 
TimeUnit.SECONDS)).asTwitter()
               }
           }.toSeq
@@ -534,11 +539,13 @@ class RegionExecutionCoordinator(
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
         DocumentFactory.createDocument(storageUriToAdd, schema)
-        WorkflowExecutionsResource.insertOperatorPortResultUri(
-          eid = eid,
-          globalPortId = outputPortId,
-          uri = storageUriToAdd
-        )
+        if (!isRestart) {
+          WorkflowExecutionsResource.insertOperatorPortResultUri(
+            eid = eid,
+            globalPortId = outputPortId,
+            uri = storageUriToAdd
+          )
+        }
     }
   }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 05585f88d8..5a9c84c701 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -85,9 +85,15 @@ class WorkflowExecutionCoordinator(
         executedRegions.append(nextRegions)
         nextRegions
           .map(region => {
-            workflowExecution.initRegionExecution(region)
+            val isRestart = workflowExecution.hasRegionExecution(region.id)
+            if (isRestart) {
+              workflowExecution.restartRegionExecution(region)
+            } else {
+              workflowExecution.initRegionExecution(region)
+            }
             regionExecutionCoordinators(region.id) = new 
RegionExecutionCoordinator(
               region,
+              isRestart,
               workflowExecution,
               asyncRPCClient,
               controllerConfig,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
index 704ebd7f47..f7e26803b4 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
@@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{
   EmbeddedControlMessageIdentity
 }
 import org.apache.texera.amber.engine.architecture.controller.ClientEvent
-import 
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+  NetworkInputGateway,
+  NetworkOutputGateway
+}
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
 import 
org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc
 import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
@@ -125,7 +128,8 @@ object AsyncRPCClient {
 }
 
 class AsyncRPCClient(
-    outputGateway: NetworkOutputGateway,
+    val inputGateway: NetworkInputGateway,
+    val outputGateway: NetworkOutputGateway,
     val actorId: ActorVirtualIdentity
 ) extends AmberLogging {
 
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
index e236ac760b..ca86338d36 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
@@ -24,7 +24,10 @@ import 
org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, Chann
 import 
org.apache.texera.amber.engine.architecture.common.WorkflowActor.NetworkAck
 import org.apache.texera.amber.engine.architecture.common.{AmberProcessor, 
WorkflowActor}
 import 
org.apache.texera.amber.engine.architecture.control.utils.TrivialControlTester.ControlTesterRPCClient
-import 
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+  NetworkInputGateway,
+  NetworkOutputGateway
+}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.AsyncRPCContext
 import 
org.apache.texera.amber.engine.architecture.rpc.testerservice.RPCTesterFs2Grpc
 import org.apache.texera.amber.engine.common.CheckpointState
@@ -37,8 +40,11 @@ import org.apache.texera.amber.engine.common.ambermessage.{
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
 
 object TrivialControlTester {
-  class ControlTesterRPCClient(outputGateway: NetworkOutputGateway, actorId: 
ActorVirtualIdentity)
-      extends AsyncRPCClient(outputGateway, actorId) {
+  class ControlTesterRPCClient(
+      inputGateway: NetworkInputGateway,
+      outputGateway: NetworkOutputGateway,
+      actorId: ActorVirtualIdentity
+  ) extends AsyncRPCClient(inputGateway, outputGateway, actorId) {
     val getProxy: RPCTesterFs2Grpc[Future, AsyncRPCContext] =
       AsyncRPCClient
         .createProxy[RPCTesterFs2Grpc[Future, AsyncRPCContext]](createPromise, 
outputGateway)
@@ -55,7 +61,7 @@ class TrivialControlTester(
       case Right(value) => transferService.send(value)
     }
   ) {
-    override val asyncRPCClient = new ControlTesterRPCClient(outputGateway, id)
+    override val asyncRPCClient = new ControlTesterRPCClient(inputGateway, 
outputGateway, id)
   }
   val initializer =
     new TesterAsyncRPCHandlerInitializer(ap.actorId, ap.asyncRPCClient, 
ap.asyncRPCServer)
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
index 1679144354..04f4f00045 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
@@ -84,4 +84,16 @@ class NetworkInputGatewaySpec extends AnyFlatSpec with 
MockFactory {
 
   }
 
+  "network input port" should "remove control channel by sender" in {
+    val inputPort = new NetworkInputGateway(fakeReceiverID)
+    val controlChannelId = ChannelIdentity(fakeSenderID, fakeReceiverID, 
isControl = true)
+    inputPort.getChannel(controlChannelId)
+
+    assert(inputPort.getAllControlChannels.size == 1)
+
+    inputPort.removeControlChannel(fakeSenderID)
+
+    assert(inputPort.getAllControlChannels.isEmpty)
+  }
+
 }
diff --git 
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
 
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
index 1bd7fcdf6c..bd55124a72 100644
--- 
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
@@ -19,6 +19,13 @@
 
 package org.apache.texera.web.resource.dashboard.user.workflow
 
+import org.apache.texera.amber.core.virtualidentity.{
+  ExecutionIdentity,
+  OperatorIdentity,
+  PhysicalOpIdentity
+}
+import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
 import org.apache.texera.dao.MockTexeraDB
 import org.apache.texera.dao.jooq.generated.Tables._
 import org.apache.texera.dao.jooq.generated.tables.daos.{
@@ -36,6 +43,7 @@ import org.apache.texera.dao.jooq.generated.tables.pojos.{
 import org.scalatest.flatspec.AnyFlatSpec
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, 
PrivateMethodTester}
 
+import java.net.URI
 import java.sql.Timestamp
 import java.util.UUID
 import java.util.concurrent.TimeUnit
@@ -176,4 +184,36 @@ class WorkflowExecutionsResourceSpec
     )
   }
 
+  "WorkflowExecutionsResource.insertOperatorPortResultUri" should "insert a 
result URI row" in {
+    val execution = new WorkflowExecutions
+    execution.setVid(testVersion.getVid)
+    execution.setUid(testUser.getUid)
+    execution.setStatus(0.toByte)
+    execution.setResult("")
+    execution.setStartingTime(new Timestamp(System.currentTimeMillis()))
+    execution.setBookmarked(false)
+    execution.setName("Execution with duplicate result URI insert")
+    execution.setEnvironmentVersion("test-env-1.0")
+    workflowExecutionsDao.insert(execution)
+
+    val executionId = ExecutionIdentity(execution.getEid.longValue())
+    val globalPortId = GlobalPortIdentity(
+      PhysicalOpIdentity(OperatorIdentity("operator-1"), "main"),
+      PortIdentity(),
+      input = false
+    )
+    val uri = URI.create("vfs:///test-result")
+
+    WorkflowExecutionsResource.insertOperatorPortResultUri(executionId, 
globalPortId, uri)
+
+    val rows = getDSLContext
+      .selectFrom(OPERATOR_PORT_EXECUTIONS)
+      
.where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid))
+      
.and(OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString))
+      .fetch()
+
+    assert(rows.size() == 1)
+    assert(rows.get(0).getResultUri == uri.toString)
+  }
+
 }

Reply via email to