This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 706884f0b9 update
706884f0b9 is described below

commit 706884f0b9e32c60bf7bfb6586fc10ab98b8758c
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Feb 10 20:32:39 2026 -0800

    update
---
 .../texera/amber/engine/architecture/common/AmberProcessor.scala  | 4 ++--
 .../apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala    | 8 ++++++--
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
index e776307323..22811b4641 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
@@ -43,7 +43,7 @@ abstract class AmberProcessor(
     with Serializable {
 
   /** FIFO & exactly once */
-  val inputGateway: InputGateway = new NetworkInputGateway(this.actorId)
+  val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId)
 
   // 1. Unified Output
   val outputGateway: NetworkOutputGateway =
@@ -55,7 +55,7 @@ abstract class AmberProcessor(
       }
     )
   // 2. RPC Layer
-  val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId)
+  val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId)
   val asyncRPCServer: AsyncRPCServer =
     new AsyncRPCServer(outputGateway, actorId)
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
index 704ebd7f47..f7e26803b4 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
@@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{
   EmbeddedControlMessageIdentity
 }
 import org.apache.texera.amber.engine.architecture.controller.ClientEvent
-import 
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+  NetworkInputGateway,
+  NetworkOutputGateway
+}
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
 import 
org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc
 import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
@@ -125,7 +128,8 @@ object AsyncRPCClient {
 }
 
 class AsyncRPCClient(
-    outputGateway: NetworkOutputGateway,
+    val inputGateway: NetworkInputGateway,
+    val outputGateway: NetworkOutputGateway,
     val actorId: ActorVirtualIdentity
 ) extends AmberLogging {
 

Reply via email to