This is an automated email from the ASF dual-hosted git repository.

shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/shengquan-add-reconfigration 
by this push:
     new d422896fba add test cases
d422896fba is described below

commit d422896fba101e8406204198c06700919c0a8306
Author: Shengquan Ni <[email protected]>
AuthorDate: Sat Feb 14 19:52:17 2026 -0800

    add test cases
---
 .../engine/architecture/rpc/controlcommands.proto  |   1 +
 .../amber/engine/common/rpc/AsyncRPCServer.scala   |   4 +-
 .../texera/amber/engine/e2e/ModifyLogicSpec.scala  | 180 +++++++++++++++++++++
 .../texera/amber/operator/TestOperators.scala      |  27 +++-
 4 files changed, 206 insertions(+), 6 deletions(-)

diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
index 2ce7f9be8e..b4658a63d0 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
@@ -44,6 +44,7 @@ message ControlRequest {
     PortCompletedRequest portCompletedRequest = 7;
     WorkerStateUpdatedRequest workerStateUpdatedRequest = 8;
     LinkWorkersRequest linkWorkersRequest = 9;
+    WorkflowReconfigureRequest workflowReconfigureRequest = 10;
 
     // request for worker
     AddInputChannelRequest addInputChannelRequest = 50;
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala
index e9a3e2cc45..78c1ed206e 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala
@@ -86,7 +86,9 @@ class AsyncRPCServer(
         } catch {
           case e: java.lang.reflect.InvocationTargetException =>
             throw Option(e.getCause).getOrElse(e)
-          case e: Throwable => throw e
+          case e: Throwable =>
+            logger.info(s"error when invoking ${method.getName} + 
${requestArg} with ${contextArg}")
+            throw e
         }
       result
         .asInstanceOf[Future[ControlReturn]]
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
new file mode 100644
index 0000000000..2572a78137
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.e2e
+
+import com.twitter.util.{Await, Duration, Promise}
+import com.typesafe.scalalogging.Logger
+import org.apache.pekko.actor.{ActorSystem, Props}
+import org.apache.pekko.testkit.{ImplicitSender, TestKit}
+import org.apache.pekko.util.Timeout
+import org.apache.texera.amber.clustering.SingleNodeListener
+import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import 
org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, 
ExecutionStateUpdate}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{EmptyRequest, 
UpdateExecutorRequest, WorkflowReconfigureRequest}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.client.AmberClient
+import 
org.apache.texera.amber.engine.e2e.TestUtils.{cleanupWorkflowExecutionData, 
initiateTexeraDBForTestCases, setUpWorkflowExecutionData}
+import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
+import org.apache.texera.amber.operator.TestOperators.{pythonOpDesc, 
pythonSourceOpDesc}
+import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
+import org.apache.texera.workflow.LogicalLink
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
+import org.scalatest.flatspec.AnyFlatSpecLike
+
+import scala.concurrent.duration._
+
+class ModifyLogicSpec extends TestKit(ActorSystem("ModifyLogicSpec", 
AmberRuntime.akkaConfig))
+  with ImplicitSender
+  with AnyFlatSpecLike
+  with BeforeAndAfterAll
+  with BeforeAndAfterEach
+  with Retries  {
+
+  /**
+   * This block retries each test once if it fails.
+   * In the CI environment, there is a chance that executeWorkflow does not 
receive "COMPLETED" status.
+   * Until we find the root cause of this issue, we use a retry mechanism here 
to stablize CI runs.
+   */
+  override def withFixture(test: NoArgTest): Outcome =
+    withRetry { super.withFixture(test) }
+
+  implicit val timeout: Timeout = Timeout(5.seconds)
+
+  val logger = Logger("ModifyLogicSpecLogger")
+
+  override protected def beforeEach(): Unit = {
+    setUpWorkflowExecutionData()
+  }
+
+  override protected def afterEach(): Unit = {
+    cleanupWorkflowExecutionData()
+  }
+
+  override def beforeAll(): Unit = {
+    system.actorOf(Props[SingleNodeListener](), "cluster-info")
+    // These test cases access postgres in CI, but occasionally the jdbc 
driver cannot be found during CI run.
+    // Explicitly load the JDBC driver to avoid flaky CI failures.
+    Class.forName("org.postgresql.Driver")
+    initiateTexeraDBForTestCases()
+  }
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+
+  def shouldReconfigure(
+                         operators: List[LogicalOp],
+                         links: List[LogicalLink],
+                         targetOp: LogicalOp,
+                         newOpExecInitInfo: OpExecInitInfo,
+                         checkResultLambda: (Map[OperatorIdentity, 
List[Tuple]]) => Boolean
+                 ): Unit = {
+    val workflow =
+      TestUtils.buildWorkflow(operators, links, new WorkflowContext())
+    val client =
+      new AmberClient(
+        system,
+        workflow.context,
+        workflow.physicalPlan,
+        ControllerConfig.default,
+        error => {}
+      )
+    val completion = Promise[Unit]()
+    client
+      .registerCallback[ExecutionStateUpdate](evt => {
+        if (evt.state == COMPLETED) {
+//          checkResultLambda(workflow.logicalPlan.getTerminalOperatorIds
+//            .filter(terminalOpId => {
+//              val uri = getResultUriByLogicalPortId(
+//                workflow.context.executionId,
+//                terminalOpId,
+//                PortIdentity()
+//              )
+//              uri.nonEmpty
+//            })
+//            .map(terminalOpId => {
+//              //TODO: remove the delay after fixing the issue of reporting 
"completed" status too early.
+//              Thread.sleep(1000)
+//              val uri = getResultUriByLogicalPortId(
+//                workflow.context.executionId,
+//                terminalOpId,
+//                PortIdentity()
+//              ).get
+//              terminalOpId -> DocumentFactory
+//                .openDocument(uri)
+//                ._1
+//                .asInstanceOf[VirtualDocument[Tuple]]
+//                .get()
+//                .toList
+//            })
+//            .toMap)
+          completion.setDone()
+        }
+      })
+    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+    Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
+    Thread.sleep(4000)
+    val physicalOps = 
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(targetOp.operatorIdentifier)
+    assert(physicalOps.nonEmpty && physicalOps.length == 1,
+      "cannot reconfigure more than one physical operator in this test")
+    
Await.result(client.controllerInterface.reconfigureWorkflow(WorkflowReconfigureRequest(
+      reconfiguration =
+        physicalOps.map(op => UpdateExecutorRequest(op.id, newOpExecInitInfo)
+      ), reconfigurationId = "test-reconfigure-1"), ()))
+    Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
+    Thread.sleep(400)
+    Await.result(completion, Duration.fromMinutes(1))
+  }
+
+
+  "Engine" should "be able to modify a python UDF worker in workflow" in {
+    val udfOpDesc = pythonOpDesc()
+    val sourceOpDesc = pythonSourceOpDesc(5000)
+    val code = """
+                 |from pytexera import *
+                 |
+                 |class ProcessTupleOperator(UDFOperatorV2):
+                 |    @overrides
+                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+                 |        tuple_['field_2'] = tuple_['field_2'] + 
'_reconfigured'
+                 |        yield tuple_
+                 |""".stripMargin
+
+    shouldReconfigure(List(sourceOpDesc, udfOpDesc), List(LogicalLink(
+      sourceOpDesc.operatorIdentifier,
+      PortIdentity(),
+      udfOpDesc.operatorIdentifier,
+      PortIdentity()
+    )),
+      udfOpDesc, OpExecWithCode(code, "python"),
+      results => results(udfOpDesc.operatorIdentifier).exists {
+        t => 
t.getField("field_2").asInstanceOf[String].contains("_reconfigured")
+      }
+    )
+  }
+
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
index ab7e8dc2de..0f6fcd669a 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
@@ -20,11 +20,8 @@
 package org.apache.texera.amber.operator
 
 import org.apache.texera.amber.core.storage.FileResolver
-import org.apache.texera.amber.operator.aggregate.{
-  AggregateOpDesc,
-  AggregationFunction,
-  AggregationOperation
-}
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
+import org.apache.texera.amber.operator.aggregate.{AggregateOpDesc, 
AggregationFunction, AggregationOperation}
 import org.apache.texera.amber.operator.hashJoin.HashJoinOpDesc
 import org.apache.texera.amber.operator.keywordSearch.KeywordSearchOpDesc
 import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
@@ -32,6 +29,7 @@ import 
org.apache.texera.amber.operator.source.scan.json.JSONLScanSourceOpDesc
 import 
org.apache.texera.amber.operator.source.sql.asterixdb.AsterixDBSourceOpDesc
 import org.apache.texera.amber.operator.source.sql.mysql.MySQLSourceOpDesc
 import org.apache.texera.amber.operator.udf.python.PythonUDFOpDescV2
+import 
org.apache.texera.amber.operator.udf.python.source.PythonUDFSourceOpDescV2
 
 import java.nio.file.Path
 
@@ -171,14 +169,33 @@ object TestOperators {
   def pythonOpDesc(): PythonUDFOpDescV2 = {
     val udf = new PythonUDFOpDescV2()
     udf.workers = 1
+    udf.retainInputColumns = true
     udf.code = """
         |from pytexera import *
         |
         |class ProcessTupleOperator(UDFOperatorV2):
         |    @overrides
         |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+        |        print(tuple_)
         |        yield tuple_
         |""".stripMargin
     udf
   }
+
+  def pythonSourceOpDesc(num_tuple: Int): PythonUDFSourceOpDescV2 = {
+    val udf = new PythonUDFSourceOpDescV2()
+    udf.workers = 1
+    udf.columns = List(new Attribute("field_1", AttributeType.INTEGER), new 
Attribute("field_2", AttributeType.STRING))
+    udf.code =
+      s"""
+         |from pytexera import *
+         |
+         |class ProcessTupleOperator(UDFSourceOperator):
+         |    @overrides
+         |    def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
+         |        for i in range($num_tuple):
+         |          yield {'field_1': i, 'field_2': str(i)}
+         |""".stripMargin
+    udf
+  }
 }

Reply via email to