This is an automated email from the ASF dual-hosted git repository.

shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/shengquan-add-reconfigration 
by this push:
     new 7d0342ef70 fix python end
7d0342ef70 is described below

commit 7d0342ef70418fe36bae11510dcf07afc1d2b70f
Author: Shengquan Ni <[email protected]>
AuthorDate: Sun Feb 15 01:37:46 2026 -0800

    fix python end
---
 .../handlers/control/update_executor_handler.py    |   2 +-
 .../rpc/async_rpc_handler_initializer.py           |   2 +
 .../src/main/python/core/models/internal_queue.py  |   3 +
 .../main/python/core/runnables/network_receiver.py |   1 -
 .../promisehandlers/ReconfigurationHandler.scala   |   9 +-
 .../texera/amber/engine/e2e/ModifyLogicSpec.scala  | 130 ++++++++++++++-------
 .../texera/amber/operator/TestOperators.scala      |   5 +-
 7 files changed, 104 insertions(+), 48 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
 
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
index 7449cbb4b5..a79d31c180 100644
--- 
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
+++ 
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
@@ -1 +1 @@
-# Licensed to the Apache Software Foundation (ASF) under one # or more 
contributor license agreements.  See the NOTICE file # distributed with this 
work for additional information # regarding copyright ownership.  The ASF 
licenses this file # to you under the Apache License, Version 2.0 (the # 
"License"); you may not use this file except in compliance # with the License.  
You may obtain a copy of the License at # #   
http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable 
law or agreed to in writing, # software distributed under the License is 
distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # 
KIND, either express or implied.  See the License for the # specific language 
governing permissions and limitations # under the License.  from 
core.architecture.handlers.control.control_handler_base import ControlHandler 
from proto.org.apache.texera.amber.engine.architecture.rpc import (     
EmptyReturn,     UpdateExecutorRequest, ) from loguru im
 port logger   class UpdateExecutorHandler(ControlHandler):     async def 
update_executor(self, req: UpdateExecutorRequest) -> EmptyReturn:         
self.context.executor_manager.update_executor(req.code, req.is_source)         
return EmptyReturn()
\ No newline at end of file
+# Licensed to the Apache Software Foundation (ASF) under one # or more 
contributor license agreements.  See the NOTICE file # distributed with this 
work for additional information # regarding copyright ownership.  The ASF 
licenses this file # to you under the Apache License, Version 2.0 (the # 
"License"); you may not use this file except in compliance # with the License.  
You may obtain a copy of the License at # #   
http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable 
law or agreed to in writing, # software distributed under the License is 
distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # 
KIND, either express or implied.  See the License for the # specific language 
governing permissions and limitations # under the License.  from 
core.architecture.handlers.control.control_handler_base import ControlHandler 
from proto.org.apache.texera.amber.engine.architecture.rpc import (     
EmptyReturn,     UpdateExecutorRequest, ) from core.util
  import get_one_of from proto.org.apache.texera.amber.core import 
OpExecWithCode   class UpdateExecutorHandler(ControlHandler):     async def 
update_executor(self, req: UpdateExecutorRequest) -> EmptyReturn:         
op_exec_with_code: OpExecWithCode = get_one_of(req.new_exec_init_info)         
self.context.executor_manager.update_executor(op_exec_with_code.code, False)    
     return EmptyReturn()
\ No newline at end of file
diff --git 
a/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py 
b/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py
index c2574028a1..7b60bc27fb 100644
--- 
a/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py
+++ 
b/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py
@@ -45,6 +45,7 @@ from 
core.architecture.handlers.control.replay_current_tuple_handler import (
 from core.architecture.handlers.control.resume_worker_handler import 
ResumeWorkerHandler
 from core.architecture.handlers.control.start_channel_handler import 
StartChannelHandler
 from core.architecture.handlers.control.start_worker_handler import 
StartWorkerHandler
+from core.architecture.handlers.control.update_executor_handler import 
UpdateExecutorHandler
 
 
 class AsyncRPCHandlerInitializer(
@@ -64,5 +65,6 @@ class AsyncRPCHandlerInitializer(
     StartChannelHandler,
     EndChannelHandler,
     NoOperationHandler,
+    UpdateExecutorHandler,
 ):
     pass
diff --git a/amber/src/main/python/core/models/internal_queue.py 
b/amber/src/main/python/core/models/internal_queue.py
index abc1793ff6..ae0247aa2d 100644
--- a/amber/src/main/python/core/models/internal_queue.py
+++ b/amber/src/main/python/core/models/internal_queue.py
@@ -79,6 +79,9 @@ class InternalQueue(IQueue):
             if item.tag not in self._queue_ids:
                 self._queue.add_sub_queue(item.tag, 1 if item.tag.is_control 
else 2)
                 self._queue_ids.add(item.tag)
+                if len(self._queue_state) > 0 and not item.tag.is_control:
+                    # if paused, then the newly added queue will also be 
paused.
+                    self._queue.disable(item.tag)
             if isinstance(item, (DataElement, InternalMarker, ECMElement)):
                 self._queue.put(item.tag, item)
             elif isinstance(item, DCMElement):
diff --git a/amber/src/main/python/core/runnables/network_receiver.py 
b/amber/src/main/python/core/runnables/network_receiver.py
index 879b623b76..fd42a8f589 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -126,7 +126,6 @@ class NetworkReceiver(Runnable, Stoppable):
                     payload=python_control_message.payload,
                 )
             )
-            logger.info(python_control_message.payload)
             return shared_queue.in_mem_size()
 
         self._proxy_server.register_control_handler(control_handler)
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
index 573306bc06..dd31aae420 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
@@ -37,7 +37,14 @@ trait ReconfigurationHandler {
 
   override def reconfigureWorkflow(msg: WorkflowReconfigureRequest, ctx: 
AsyncRPCContext): Future[EmptyReturn] = {
     val futures = mutable.ArrayBuffer[Future[_]]()
-    
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator,
 msg).foreach{
+    val friesComponents = 
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator,
 msg)
+    val needToSendECMToSources = friesComponents.exists(comp => 
comp.sources.exists(sourceOp => 
cp.workflowScheduler.physicalPlan.getOperator(sourceOp).isSourceOperator))
+    if(needToSendECMToSources){
+      throw new IllegalStateException(
+        "Reconfiguration cannot be propagated through source operators"
+      )
+    }
+    friesComponents.foreach{
       friesComponent =>
         if(friesComponent.scope.size == 1){
           val updateExecutorRequest = friesComponent.reconfigurations.head
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
index 699329bfae..41016a8144 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
@@ -90,10 +90,9 @@ class ModifyLogicSpec extends 
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
   def shouldReconfigure(
                          operators: List[LogicalOp],
                          links: List[LogicalLink],
-                         targetOp: LogicalOp,
-                         newOpExecInitInfo: OpExecInitInfo,
-                         checkResultLambda: (Map[OperatorIdentity, 
List[Tuple]]) => Boolean
-                 ): Unit = {
+                         targetOps: Seq[LogicalOp],
+                         newOpExecInitInfo: OpExecInitInfo
+                 ): Map[OperatorIdentity, List[Tuple]] = {
     val workflow =
       TestUtils.buildWorkflow(operators, links, ctx)
     val client =
@@ -105,43 +104,42 @@ class ModifyLogicSpec extends 
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
         error => {}
       )
     val completion = Promise[Unit]()
+    var result: Map[OperatorIdentity, List[Tuple]] = null
     client
       .registerCallback[ExecutionStateUpdate](evt => {
         if (evt.state == COMPLETED) {
-//          checkResultLambda(workflow.logicalPlan.getTerminalOperatorIds
-//            .filter(terminalOpId => {
-//              val uri = getResultUriByLogicalPortId(
-//                workflow.context.executionId,
-//                terminalOpId,
-//                PortIdentity()
-//              )
-//              uri.nonEmpty
-//            })
-//            .map(terminalOpId => {
-//              //TODO: remove the delay after fixing the issue of reporting 
"completed" status too early.
-//              Thread.sleep(1000)
-//              val uri = getResultUriByLogicalPortId(
-//                workflow.context.executionId,
-//                terminalOpId,
-//                PortIdentity()
-//              ).get
-//              terminalOpId -> DocumentFactory
-//                .openDocument(uri)
-//                ._1
-//                .asInstanceOf[VirtualDocument[Tuple]]
-//                .get()
-//                .toList
-//            })
-//            .toMap)
+          result = workflow.logicalPlan.getTerminalOperatorIds
+            .filter(terminalOpId => {
+              val uri = getResultUriByLogicalPortId(
+                workflow.context.executionId,
+                terminalOpId,
+                PortIdentity()
+              )
+              uri.nonEmpty
+            })
+            .map(terminalOpId => {
+              //TODO: remove the delay after fixing the issue of reporting 
"completed" status too early.
+              Thread.sleep(1000)
+              val uri = getResultUriByLogicalPortId(
+                workflow.context.executionId,
+                terminalOpId,
+                PortIdentity()
+              ).get
+              terminalOpId -> DocumentFactory
+                .openDocument(uri)
+                ._1
+                .asInstanceOf[VirtualDocument[Tuple]]
+                .get()
+                .toList
+            })
+            .toMap
           completion.setDone()
         }
       })
     Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
     Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
     Thread.sleep(4000)
-    val physicalOps = 
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(targetOp.operatorIdentifier)
-    assert(physicalOps.nonEmpty && physicalOps.length == 1,
-      "cannot reconfigure more than one physical operator in this test")
+    val physicalOps = targetOps.flatMap(op => 
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier))
     
Await.result(client.controllerInterface.reconfigureWorkflow(WorkflowReconfigureRequest(
       reconfiguration =
         physicalOps.map(op => UpdateExecutorRequest(op.id, newOpExecInitInfo)
@@ -149,48 +147,94 @@ class ModifyLogicSpec extends 
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
     Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
     Thread.sleep(400)
     Await.result(completion, Duration.fromMinutes(1))
+    result
   }
 
 
   "Engine" should "be able to modify a python UDF worker in workflow" in {
+    val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
     val udfOpDesc = pythonOpDesc()
-    val sourceOpDesc = pythonSourceOpDesc(5000)
     val code = """
                  |from pytexera import *
                  |
                  |class ProcessTupleOperator(UDFOperatorV2):
                  |    @overrides
                  |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
-                 |        tuple_['field_2'] = tuple_['field_2'] + 
'_reconfigured'
+                 |        tuple_['Region'] = tuple_['Region'] + '_reconfigured'
                  |        yield tuple_
                  |""".stripMargin
 
-    shouldReconfigure(List(sourceOpDesc, udfOpDesc), List(LogicalLink(
+    val result = shouldReconfigure(List(sourceOpDesc, udfOpDesc), 
List(LogicalLink(
       sourceOpDesc.operatorIdentifier,
       PortIdentity(),
       udfOpDesc.operatorIdentifier,
       PortIdentity()
     )),
-      udfOpDesc, OpExecWithCode(code, "python"),
-      results => results(udfOpDesc.operatorIdentifier).exists {
-        t => 
t.getField("field_2").asInstanceOf[String].contains("_reconfigured")
-      }
-    )
+      Seq(udfOpDesc), OpExecWithCode(code, "python"))
+  assert(result(udfOpDesc.operatorIdentifier).exists {
+    t => t.getField("Region").asInstanceOf[String].contains("_reconfigured")
+  })
   }
 
   "Engine" should "be able to modify a java operator in workflow" in {
     val sourceOpDesc = mediumCsvScanOpDesc()
     val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"ShouldMatchNone")
     val keywordMatchManyOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"Asia")
-    shouldReconfigure(List(sourceOpDesc, keywordMatchNoneOpDesc), 
List(LogicalLink(
+    val result = shouldReconfigure(List(sourceOpDesc, keywordMatchNoneOpDesc), 
List(LogicalLink(
       sourceOpDesc.operatorIdentifier,
       PortIdentity(),
       keywordMatchNoneOpDesc.operatorIdentifier,
       PortIdentity()
     )),
-      keywordMatchNoneOpDesc, 
keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId, 
ctx.executionId).opExecInitInfo,
-      results => results(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty
+      Seq(keywordMatchNoneOpDesc), 
keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId, 
ctx.executionId).opExecInitInfo
     )
+    assert(result(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty)
+  }
+
+  "Engine" should "not be able to modify a source operator in workflow" in {
+    val sourceOpDesc = mediumCsvScanOpDesc()
+    val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"ShouldMatchNone")
+    val ex = intercept[Throwable] {
+      shouldReconfigure(List(sourceOpDesc, keywordMatchNoneOpDesc), 
List(LogicalLink(
+      sourceOpDesc.operatorIdentifier,
+      PortIdentity(),
+      keywordMatchNoneOpDesc.operatorIdentifier,
+      PortIdentity()
+    )),
+      Seq(sourceOpDesc), sourceOpDesc.getPhysicalOp(ctx.workflowId, 
ctx.executionId).opExecInitInfo
+    )}
+    assert(ex.getMessage == "java.lang.IllegalStateException: Reconfiguration 
cannot be propagated through source operators")
+  }
+
+  "Engine" should "be able to modify two python UDFs in workflow" in {
+    val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
+    val udfOpDesc1 = pythonOpDesc()
+    val udfOpDesc2 = pythonOpDesc()
+    val code = """
+                 |from pytexera import *
+                 |
+                 |class ProcessTupleOperator(UDFOperatorV2):
+                 |    @overrides
+                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+                 |        tuple_['Region'] = tuple_['Region'] + '_reconfigured'
+                 |        yield tuple_
+                 |""".stripMargin
+
+    val result = shouldReconfigure(List(sourceOpDesc, udfOpDesc1, udfOpDesc2), 
List(LogicalLink(
+      sourceOpDesc.operatorIdentifier,
+      PortIdentity(),
+      udfOpDesc1.operatorIdentifier,
+      PortIdentity()
+    ), LogicalLink(
+      udfOpDesc1.operatorIdentifier,
+      PortIdentity(),
+      udfOpDesc2.operatorIdentifier,
+      PortIdentity()
+    )),
+      Seq(udfOpDesc1, udfOpDesc2), OpExecWithCode(code, "python"))
+    assert(result(udfOpDesc2.operatorIdentifier).exists {
+      t => 
t.getField("Region").asInstanceOf[String].contains("_reconfigured_reconfigured")
+    })
   }
 
 }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
index 0f6fcd669a..0dedd09e1f 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
@@ -182,18 +182,19 @@ object TestOperators {
     udf
   }
 
-  def pythonSourceOpDesc(num_tuple: Int): PythonUDFSourceOpDescV2 = {
+  def pythonSourceOpDesc(num_tuple: Int, delay_in_sec: Double): 
PythonUDFSourceOpDescV2 = {
     val udf = new PythonUDFSourceOpDescV2()
     udf.workers = 1
     udf.columns = List(new Attribute("field_1", AttributeType.INTEGER), new 
Attribute("field_2", AttributeType.STRING))
     udf.code =
       s"""
          |from pytexera import *
-         |
+         |from time import sleep
          |class ProcessTupleOperator(UDFSourceOperator):
          |    @overrides
          |    def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
          |        for i in range($num_tuple):
+         |          sleep($delay_in_sec)
          |          yield {'field_1': i, 'field_2': str(i)}
          |""".stripMargin
     udf

Reply via email to