This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 21e6a41bb5 update
21e6a41bb5 is described below

commit 21e6a41bb503d23ab1d67af3c142e6237528ac66
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Feb 10 20:33:28 2026 -0800

    update
---
 .../engine/architecture/messaginglayer/NetworkInputGateway.scala      | 4 ++++
 .../engine/architecture/messaginglayer/NetworkOutputGateway.scala     | 4 ++++
 .../engine/architecture/scheduling/RegionExecutionCoordinator.scala   | 2 ++
 3 files changed, 10 insertions(+)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
index 5cfd8aabc0..1d3ee3cb72 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
@@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity)
     enforcers += enforcer
   }
 
+  def removeControlChannel(from: ActorVirtualIdentity): Unit = {
+    inputChannels.remove(ChannelIdentity(from, actorId, isControl = true))
+  }
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
index 929a30f4ef..e35e819d41 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
@@ -94,4 +94,8 @@ class NetworkOutputGateway(
     idToSequenceNums.getOrElseUpdate(channelId, new 
AtomicLong()).getAndIncrement()
   }
 
+  def removeControlChannel(to: ActorVirtualIdentity): Unit = {
+    idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true))
+  }
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 7e5b228801..4b861de657 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -166,6 +166,8 @@ class RegionExecutionCoordinator(
                 val actorRef = actorRefService.getActorRef(workerId)
                 // Remove the actorRef so that no other actors can find the 
worker and send messages.
                 actorRefService.removeActorRef(workerId)
+                asyncRPCClient.outputGateway.removeControlChannel(workerId)
+                asyncRPCClient.inputGateway.removeControlChannel(workerId)
                 gracefulStop(actorRef, Duration(5, 
TimeUnit.SECONDS)).asTwitter()
               }
           }.toSeq

Reply via email to