This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 6be7dc5bcf update
6be7dc5bcf is described below

commit 6be7dc5bcf3b0aaf6df92e4eff59ddc03e2f9096
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Feb 11 18:35:07 2026 -0800

    update
---
 .../messaginglayer/AmberFIFOChannel.scala          | 22 +++++++++++-----------
 .../scheduling/RegionExecutionCoordinator.scala    |  4 ++--
 2 files changed, 13 insertions(+), 13 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
index d81b4239ba..7917721c9c 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
@@ -41,18 +41,18 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) 
extends AmberLogging {
   private var portId: Option[PortIdentity] = None
 
   def acceptMessage(msg: WorkflowFIFOMessage): Unit = {
-    val seq = msg.sequenceNumber
-    val payload = msg.payload
-    if (isDuplicated(seq)) {
-      logger.debug(
-        s"received duplicated message $payload with seq = $seq while current 
seq = $current"
-      )
-    } else if (isAhead(seq)) {
-      logger.debug(s"received ahead message $payload with seq = $seq while 
current seq = $current")
-      stash(seq, msg)
-    } else {
+    //val seq = msg.sequenceNumber
+    //val payload = msg.payload
+    //if (isDuplicated(seq)) {
+    //  logger.debug(
+    //    s"received duplicated message $payload with seq = $seq while current 
seq = $current"
+    //  )
+    //} else if (isAhead(seq)) {
+    //  logger.debug(s"received ahead message $payload with seq = $seq while 
current seq = $current")
+    //  stash(seq, msg)
+    //} else {
       enforceFIFO(msg)
-    }
+    //}
   }
 
   def getCurrentSeq: Long = current
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 4b861de657..2cddb29ba1 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -166,8 +166,8 @@ class RegionExecutionCoordinator(
                 val actorRef = actorRefService.getActorRef(workerId)
                 // Remove the actorRef so that no other actors can find the 
worker and send messages.
                 actorRefService.removeActorRef(workerId)
-                asyncRPCClient.outputGateway.removeControlChannel(workerId)
-                asyncRPCClient.inputGateway.removeControlChannel(workerId)
+                //asyncRPCClient.outputGateway.removeControlChannel(workerId)
+                //asyncRPCClient.inputGateway.removeControlChannel(workerId)
                 gracefulStop(actorRef, Duration(5, 
TimeUnit.SECONDS)).asTwitter()
               }
           }.toSeq

Reply via email to