This is an automated email from the ASF dual-hosted git repository.
shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/shengquan-add-reconfigration
by this push:
new d422896fba add test cases
d422896fba is described below
commit d422896fba101e8406204198c06700919c0a8306
Author: Shengquan Ni <[email protected]>
AuthorDate: Sat Feb 14 19:52:17 2026 -0800
add test cases
---
.../engine/architecture/rpc/controlcommands.proto | 1 +
.../amber/engine/common/rpc/AsyncRPCServer.scala | 4 +-
.../texera/amber/engine/e2e/ModifyLogicSpec.scala | 180 +++++++++++++++++++++
.../texera/amber/operator/TestOperators.scala | 27 +++-
4 files changed, 206 insertions(+), 6 deletions(-)
diff --git
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
index 2ce7f9be8e..b4658a63d0 100644
---
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
+++
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
@@ -44,6 +44,7 @@ message ControlRequest {
PortCompletedRequest portCompletedRequest = 7;
WorkerStateUpdatedRequest workerStateUpdatedRequest = 8;
LinkWorkersRequest linkWorkersRequest = 9;
+ WorkflowReconfigureRequest workflowReconfigureRequest = 10;
// request for worker
AddInputChannelRequest addInputChannelRequest = 50;
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala
index e9a3e2cc45..78c1ed206e 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala
@@ -86,7 +86,9 @@ class AsyncRPCServer(
} catch {
case e: java.lang.reflect.InvocationTargetException =>
throw Option(e.getCause).getOrElse(e)
- case e: Throwable => throw e
+ case e: Throwable =>
+ logger.info(s"error when invoking ${method.getName} +
${requestArg} with ${contextArg}")
+ throw e
}
result
.asInstanceOf[Future[ControlReturn]]
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
new file mode 100644
index 0000000000..2572a78137
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.e2e
+
+import com.twitter.util.{Await, Duration, Promise}
+import com.typesafe.scalalogging.Logger
+import org.apache.pekko.actor.{ActorSystem, Props}
+import org.apache.pekko.testkit.{ImplicitSender, TestKit}
+import org.apache.pekko.util.Timeout
+import org.apache.texera.amber.clustering.SingleNodeListener
+import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import
org.apache.texera.amber.engine.architecture.controller.{ControllerConfig,
ExecutionStateUpdate}
+import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{EmptyRequest,
UpdateExecutorRequest, WorkflowReconfigureRequest}
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.client.AmberClient
+import
org.apache.texera.amber.engine.e2e.TestUtils.{cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases, setUpWorkflowExecutionData}
+import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
+import org.apache.texera.amber.operator.TestOperators.{pythonOpDesc,
pythonSourceOpDesc}
+import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
+import org.apache.texera.workflow.LogicalLink
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
+import org.scalatest.flatspec.AnyFlatSpecLike
+
+import scala.concurrent.duration._
+
+class ModifyLogicSpec extends TestKit(ActorSystem("ModifyLogicSpec",
AmberRuntime.akkaConfig))
+ with ImplicitSender
+ with AnyFlatSpecLike
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with Retries {
+
+ /**
+ * This block retries each test once if it fails.
+ * In the CI environment, there is a chance that executeWorkflow does not
receive "COMPLETED" status.
+ * Until we find the root cause of this issue, we use a retry mechanism here
to stablize CI runs.
+ */
+ override def withFixture(test: NoArgTest): Outcome =
+ withRetry { super.withFixture(test) }
+
+ implicit val timeout: Timeout = Timeout(5.seconds)
+
+ val logger = Logger("ModifyLogicSpecLogger")
+
+ override protected def beforeEach(): Unit = {
+ setUpWorkflowExecutionData()
+ }
+
+ override protected def afterEach(): Unit = {
+ cleanupWorkflowExecutionData()
+ }
+
+ override def beforeAll(): Unit = {
+ system.actorOf(Props[SingleNodeListener](), "cluster-info")
+ // These test cases access postgres in CI, but occasionally the jdbc
driver cannot be found during CI run.
+ // Explicitly load the JDBC driver to avoid flaky CI failures.
+ Class.forName("org.postgresql.Driver")
+ initiateTexeraDBForTestCases()
+ }
+
+ override def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ }
+
+
+ def shouldReconfigure(
+ operators: List[LogicalOp],
+ links: List[LogicalLink],
+ targetOp: LogicalOp,
+ newOpExecInitInfo: OpExecInitInfo,
+ checkResultLambda: (Map[OperatorIdentity,
List[Tuple]]) => Boolean
+ ): Unit = {
+ val workflow =
+ TestUtils.buildWorkflow(operators, links, new WorkflowContext())
+ val client =
+ new AmberClient(
+ system,
+ workflow.context,
+ workflow.physicalPlan,
+ ControllerConfig.default,
+ error => {}
+ )
+ val completion = Promise[Unit]()
+ client
+ .registerCallback[ExecutionStateUpdate](evt => {
+ if (evt.state == COMPLETED) {
+// checkResultLambda(workflow.logicalPlan.getTerminalOperatorIds
+// .filter(terminalOpId => {
+// val uri = getResultUriByLogicalPortId(
+// workflow.context.executionId,
+// terminalOpId,
+// PortIdentity()
+// )
+// uri.nonEmpty
+// })
+// .map(terminalOpId => {
+// //TODO: remove the delay after fixing the issue of reporting
"completed" status too early.
+// Thread.sleep(1000)
+// val uri = getResultUriByLogicalPortId(
+// workflow.context.executionId,
+// terminalOpId,
+// PortIdentity()
+// ).get
+// terminalOpId -> DocumentFactory
+// .openDocument(uri)
+// ._1
+// .asInstanceOf[VirtualDocument[Tuple]]
+// .get()
+// .toList
+// })
+// .toMap)
+ completion.setDone()
+ }
+ })
+ Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+ Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
+ Thread.sleep(4000)
+ val physicalOps =
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(targetOp.operatorIdentifier)
+ assert(physicalOps.nonEmpty && physicalOps.length == 1,
+ "cannot reconfigure more than one physical operator in this test")
+
Await.result(client.controllerInterface.reconfigureWorkflow(WorkflowReconfigureRequest(
+ reconfiguration =
+ physicalOps.map(op => UpdateExecutorRequest(op.id, newOpExecInitInfo)
+ ), reconfigurationId = "test-reconfigure-1"), ()))
+ Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
+ Thread.sleep(400)
+ Await.result(completion, Duration.fromMinutes(1))
+ }
+
+
+ "Engine" should "be able to modify a python UDF worker in workflow" in {
+ val udfOpDesc = pythonOpDesc()
+ val sourceOpDesc = pythonSourceOpDesc(5000)
+ val code = """
+ |from pytexera import *
+ |
+ |class ProcessTupleOperator(UDFOperatorV2):
+ | @overrides
+ | def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
+ | tuple_['field_2'] = tuple_['field_2'] +
'_reconfigured'
+ | yield tuple_
+ |""".stripMargin
+
+ shouldReconfigure(List(sourceOpDesc, udfOpDesc), List(LogicalLink(
+ sourceOpDesc.operatorIdentifier,
+ PortIdentity(),
+ udfOpDesc.operatorIdentifier,
+ PortIdentity()
+ )),
+ udfOpDesc, OpExecWithCode(code, "python"),
+ results => results(udfOpDesc.operatorIdentifier).exists {
+ t =>
t.getField("field_2").asInstanceOf[String].contains("_reconfigured")
+ }
+ )
+ }
+
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
index ab7e8dc2de..0f6fcd669a 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
@@ -20,11 +20,8 @@
package org.apache.texera.amber.operator
import org.apache.texera.amber.core.storage.FileResolver
-import org.apache.texera.amber.operator.aggregate.{
- AggregateOpDesc,
- AggregationFunction,
- AggregationOperation
-}
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
+import org.apache.texera.amber.operator.aggregate.{AggregateOpDesc,
AggregationFunction, AggregationOperation}
import org.apache.texera.amber.operator.hashJoin.HashJoinOpDesc
import org.apache.texera.amber.operator.keywordSearch.KeywordSearchOpDesc
import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
@@ -32,6 +29,7 @@ import
org.apache.texera.amber.operator.source.scan.json.JSONLScanSourceOpDesc
import
org.apache.texera.amber.operator.source.sql.asterixdb.AsterixDBSourceOpDesc
import org.apache.texera.amber.operator.source.sql.mysql.MySQLSourceOpDesc
import org.apache.texera.amber.operator.udf.python.PythonUDFOpDescV2
+import
org.apache.texera.amber.operator.udf.python.source.PythonUDFSourceOpDescV2
import java.nio.file.Path
@@ -171,14 +169,33 @@ object TestOperators {
def pythonOpDesc(): PythonUDFOpDescV2 = {
val udf = new PythonUDFOpDescV2()
udf.workers = 1
+ udf.retainInputColumns = true
udf.code = """
|from pytexera import *
|
|class ProcessTupleOperator(UDFOperatorV2):
| @overrides
| def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
+ | print(tuple_)
| yield tuple_
|""".stripMargin
udf
}
+
+ def pythonSourceOpDesc(num_tuple: Int): PythonUDFSourceOpDescV2 = {
+ val udf = new PythonUDFSourceOpDescV2()
+ udf.workers = 1
+ udf.columns = List(new Attribute("field_1", AttributeType.INTEGER), new
Attribute("field_2", AttributeType.STRING))
+ udf.code =
+ s"""
+ |from pytexera import *
+ |
+ |class ProcessTupleOperator(UDFSourceOperator):
+ | @overrides
+ | def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
+ | for i in range($num_tuple):
+ | yield {'field_1': i, 'field_2': str(i)}
+ |""".stripMargin
+ udf
+ }
}