This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new 6be7dc5bcf update
6be7dc5bcf is described below
commit 6be7dc5bcf3b0aaf6df92e4eff59ddc03e2f9096
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Feb 11 18:35:07 2026 -0800
update
---
.../messaginglayer/AmberFIFOChannel.scala | 22 +++++++++++-----------
.../scheduling/RegionExecutionCoordinator.scala | 4 ++--
2 files changed, 13 insertions(+), 13 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
index d81b4239ba..7917721c9c 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
@@ -41,18 +41,18 @@ class AmberFIFOChannel(val channelId: ChannelIdentity)
extends AmberLogging {
private var portId: Option[PortIdentity] = None
def acceptMessage(msg: WorkflowFIFOMessage): Unit = {
- val seq = msg.sequenceNumber
- val payload = msg.payload
- if (isDuplicated(seq)) {
- logger.debug(
- s"received duplicated message $payload with seq = $seq while current
seq = $current"
- )
- } else if (isAhead(seq)) {
- logger.debug(s"received ahead message $payload with seq = $seq while
current seq = $current")
- stash(seq, msg)
- } else {
+ //val seq = msg.sequenceNumber
+ //val payload = msg.payload
+ //if (isDuplicated(seq)) {
+ // logger.debug(
+ // s"received duplicated message $payload with seq = $seq while current
seq = $current"
+ // )
+ //} else if (isAhead(seq)) {
+ // logger.debug(s"received ahead message $payload with seq = $seq while
current seq = $current")
+ // stash(seq, msg)
+ //} else {
enforceFIFO(msg)
- }
+ //}
}
def getCurrentSeq: Long = current
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 4b861de657..2cddb29ba1 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -166,8 +166,8 @@ class RegionExecutionCoordinator(
val actorRef = actorRefService.getActorRef(workerId)
// Remove the actorRef so that no other actors can find the
worker and send messages.
actorRefService.removeActorRef(workerId)
- asyncRPCClient.outputGateway.removeControlChannel(workerId)
- asyncRPCClient.inputGateway.removeControlChannel(workerId)
+ //asyncRPCClient.outputGateway.removeControlChannel(workerId)
+ //asyncRPCClient.inputGateway.removeControlChannel(workerId)
gracefulStop(actorRef, Duration(5,
TimeUnit.SECONDS)).asTwitter()
}
}.toSeq