This is an automated email from the ASF dual-hosted git repository.

shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/shengquan-add-reconfigration 
by this push:
     new 94a302e38a make java end working
94a302e38a is described below

commit 94a302e38a5d9b80f4291cae2417ca7bdf1b5827
Author: Shengquan Ni <[email protected]>
AuthorDate: Sat Feb 14 21:23:48 2026 -0800

    make java end working
---
 .../handlers/control/update_executor_handler.py    |  2 +-
 .../main/python/core/runnables/network_receiver.py |  1 +
 .../DataProcessorRPCHandlerInitializer.scala       |  2 +-
 .../InitializeExecutorHandler.scala                |  2 +-
 .../promisehandlers/UpdateExecutorHandler.scala    |  2 +-
 .../texera/amber/engine/e2e/ModifyLogicSpec.scala  | 22 +++++++++++++++++++---
 6 files changed, 24 insertions(+), 7 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
 
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
index a5bc1a6c0f..7449cbb4b5 100644
--- 
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
+++ 
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
@@ -1 +1 @@
-# Licensed to the Apache Software Foundation (ASF) under one # or more 
contributor license agreements.  See the NOTICE file # distributed with this 
work for additional information # regarding copyright ownership.  The ASF 
licenses this file # to you under the Apache License, Version 2.0 (the # 
"License"); you may not use this file except in compliance # with the License.  
You may obtain a copy of the License at # #   
http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable 
law or agreed to in writing, # software distributed under the License is 
distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # 
KIND, either express or implied.  See the License for the # specific language 
governing permissions and limitations # under the License.  from 
core.architecture.handlers.control.control_handler_base import ControlHandler 
from proto.org.apache.texera.amber.engine.architecture.rpc import (     
EmptyReturn,     UpdateExecutorRequest, )   class Update
 ExecutorHandler(ControlHandler):     async def update_executor(self, req: 
UpdateExecutorRequest) -> EmptyReturn:         
self.context.executor_manager.update_executor(req.code, req.is_source)         
return EmptyReturn()
\ No newline at end of file
+# Licensed to the Apache Software Foundation (ASF) under one # or more 
contributor license agreements.  See the NOTICE file # distributed with this 
work for additional information # regarding copyright ownership.  The ASF 
licenses this file # to you under the Apache License, Version 2.0 (the # 
"License"); you may not use this file except in compliance # with the License.  
You may obtain a copy of the License at # #   
http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable 
law or agreed to in writing, # software distributed under the License is 
distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # 
KIND, either express or implied.  See the License for the # specific language 
governing permissions and limitations # under the License.  from 
core.architecture.handlers.control.control_handler_base import ControlHandler 
from proto.org.apache.texera.amber.engine.architecture.rpc import (     
EmptyReturn,     UpdateExecutorRequest, ) from loguru im
 port logger   class UpdateExecutorHandler(ControlHandler):     async def 
update_executor(self, req: UpdateExecutorRequest) -> EmptyReturn:         
self.context.executor_manager.update_executor(req.code, req.is_source)         
return EmptyReturn()
\ No newline at end of file
diff --git a/amber/src/main/python/core/runnables/network_receiver.py 
b/amber/src/main/python/core/runnables/network_receiver.py
index fd42a8f589..879b623b76 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -126,6 +126,7 @@ class NetworkReceiver(Runnable, Stoppable):
                     payload=python_control_message.payload,
                 )
             )
+            logger.info(python_control_message.payload)
             return shared_queue.in_mem_size()
 
         self._proxy_server.register_control_handler(control_handler)
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
index d2837860f5..cfbc40b587 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
@@ -72,7 +72,7 @@ class DataProcessorRPCHandlerInitializer(val dp: 
DataProcessor)
 
   override def noOperation(request: EmptyRequest, ctx: AsyncRPCContext): 
Future[EmptyReturn] = ???
 
-  def initializeExecutor(execInitInfo: OpExecInitInfo, workerIdx: Int, 
workerCount: Int): Unit = {
+  def setupExecutor(execInitInfo: OpExecInitInfo, workerIdx: Int, workerCount: 
Int): Unit = {
     dp.executor = execInitInfo match {
       case OpExecWithClassName(className, descString) =>
         ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, 
workerCount)
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index f6e7f3d8b2..a7fd26c434 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -42,7 +42,7 @@ trait InitializeExecutorHandler {
     dp.serializationManager.setOpInitialization(req)
     val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
     cachedTotalWorkerCount = req.totalWorkerCount
-    initializeExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
+    setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
     EmptyReturn()
   }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
index 8cb0e30be2..5c2f65c257 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
@@ -35,7 +35,7 @@ trait UpdateExecutorHandler {
                                ctx: AsyncRPCContext
                            ): Future[EmptyReturn] = {
     val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
-    initializeExecutor(request.newExecInitInfo, workerIdx, 
cachedTotalWorkerCount)
+    setupExecutor(request.newExecInitInfo, workerIdx, cachedTotalWorkerCount)
     dp.executor.open()
     EmptyReturn()
   }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
index 2572a78137..699329bfae 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
@@ -25,7 +25,7 @@ import org.apache.pekko.actor.{ActorSystem, Props}
 import org.apache.pekko.testkit.{ImplicitSender, TestKit}
 import org.apache.pekko.util.Timeout
 import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
+import org.apache.texera.amber.core.executor.{OpExecInitInfo, 
OpExecWithClassName, OpExecWithCode}
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
@@ -38,7 +38,7 @@ import org.apache.texera.amber.engine.common.AmberRuntime
 import org.apache.texera.amber.engine.common.client.AmberClient
 import 
org.apache.texera.amber.engine.e2e.TestUtils.{cleanupWorkflowExecutionData, 
initiateTexeraDBForTestCases, setUpWorkflowExecutionData}
 import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
-import org.apache.texera.amber.operator.TestOperators.{pythonOpDesc, 
pythonSourceOpDesc}
+import org.apache.texera.amber.operator.TestOperators.{mediumCsvScanOpDesc, 
pythonOpDesc, pythonSourceOpDesc}
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
 import org.apache.texera.workflow.LogicalLink
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
@@ -64,6 +64,7 @@ class ModifyLogicSpec extends 
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
   implicit val timeout: Timeout = Timeout(5.seconds)
 
   val logger = Logger("ModifyLogicSpecLogger")
+  val ctx = new WorkflowContext()
 
   override protected def beforeEach(): Unit = {
     setUpWorkflowExecutionData()
@@ -94,7 +95,7 @@ class ModifyLogicSpec extends 
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
                          checkResultLambda: (Map[OperatorIdentity, 
List[Tuple]]) => Boolean
                  ): Unit = {
     val workflow =
-      TestUtils.buildWorkflow(operators, links, new WorkflowContext())
+      TestUtils.buildWorkflow(operators, links, ctx)
     val client =
       new AmberClient(
         system,
@@ -177,4 +178,19 @@ class ModifyLogicSpec extends 
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
     )
   }
 
+  "Engine" should "be able to modify a java operator in workflow" in {
+    val sourceOpDesc = mediumCsvScanOpDesc()
+    val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"ShouldMatchNone")
+    val keywordMatchManyOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"Asia")
+    shouldReconfigure(List(sourceOpDesc, keywordMatchNoneOpDesc), 
List(LogicalLink(
+      sourceOpDesc.operatorIdentifier,
+      PortIdentity(),
+      keywordMatchNoneOpDesc.operatorIdentifier,
+      PortIdentity()
+    )),
+      keywordMatchNoneOpDesc, 
keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId, 
ctx.executionId).opExecInitInfo,
+      results => results(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty
+    )
+  }
+
 }

Reply via email to