This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-region-restart
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-region-restart by this
push:
new 20d4bb8bef Add region restart tests
20d4bb8bef is described below
commit 20d4bb8befdc548ae2f4acbe6a9819708e4488e0
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 21 18:44:28 2026 -0700
Add region restart tests
---
.../control/utils/TrivialControlTester.scala | 14 +++++---
.../messaginglayer/NetworkInputGatewaySpec.scala | 12 +++++++
.../workflow/WorkflowExecutionsResourceSpec.scala | 41 ++++++++++++++++++++++
3 files changed, 63 insertions(+), 4 deletions(-)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
index e236ac760b..ca86338d36 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
@@ -24,7 +24,10 @@ import
org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, Chann
import
org.apache.texera.amber.engine.architecture.common.WorkflowActor.NetworkAck
import org.apache.texera.amber.engine.architecture.common.{AmberProcessor,
WorkflowActor}
import
org.apache.texera.amber.engine.architecture.control.utils.TrivialControlTester.ControlTesterRPCClient
-import
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+ NetworkInputGateway,
+ NetworkOutputGateway
+}
import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.AsyncRPCContext
import
org.apache.texera.amber.engine.architecture.rpc.testerservice.RPCTesterFs2Grpc
import org.apache.texera.amber.engine.common.CheckpointState
@@ -37,8 +40,11 @@ import org.apache.texera.amber.engine.common.ambermessage.{
import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
object TrivialControlTester {
- class ControlTesterRPCClient(outputGateway: NetworkOutputGateway, actorId:
ActorVirtualIdentity)
- extends AsyncRPCClient(outputGateway, actorId) {
+ class ControlTesterRPCClient(
+ inputGateway: NetworkInputGateway,
+ outputGateway: NetworkOutputGateway,
+ actorId: ActorVirtualIdentity
+ ) extends AsyncRPCClient(inputGateway, outputGateway, actorId) {
val getProxy: RPCTesterFs2Grpc[Future, AsyncRPCContext] =
AsyncRPCClient
.createProxy[RPCTesterFs2Grpc[Future, AsyncRPCContext]](createPromise,
outputGateway)
@@ -55,7 +61,7 @@ class TrivialControlTester(
case Right(value) => transferService.send(value)
}
) {
- override val asyncRPCClient = new ControlTesterRPCClient(outputGateway, id)
+ override val asyncRPCClient = new ControlTesterRPCClient(inputGateway,
outputGateway, id)
}
val initializer =
new TesterAsyncRPCHandlerInitializer(ap.actorId, ap.asyncRPCClient,
ap.asyncRPCServer)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
index 1679144354..04f4f00045 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
@@ -84,4 +84,16 @@ class NetworkInputGatewaySpec extends AnyFlatSpec with
MockFactory {
}
+ "network input port" should "remove control channel by sender" in {
+ val inputPort = new NetworkInputGateway(fakeReceiverID)
+ val controlChannelId = ChannelIdentity(fakeSenderID, fakeReceiverID,
isControl = true)
+ inputPort.getChannel(controlChannelId)
+
+ assert(inputPort.getAllControlChannels.size == 1)
+
+ inputPort.removeControlChannel(fakeSenderID)
+
+ assert(inputPort.getAllControlChannels.isEmpty)
+ }
+
}
diff --git
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
index 1bd7fcdf6c..973a76cab8 100644
---
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
@@ -19,6 +19,13 @@
package org.apache.texera.web.resource.dashboard.user.workflow
+import org.apache.texera.amber.core.virtualidentity.{
+ ExecutionIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity
+}
+import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
import org.apache.texera.dao.MockTexeraDB
import org.apache.texera.dao.jooq.generated.Tables._
import org.apache.texera.dao.jooq.generated.tables.daos.{
@@ -36,6 +43,7 @@ import org.apache.texera.dao.jooq.generated.tables.pojos.{
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach,
PrivateMethodTester}
+import java.net.URI
import java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.TimeUnit
@@ -176,4 +184,37 @@ class WorkflowExecutionsResourceSpec
)
}
+ "WorkflowExecutionsResource.insertOperatorPortResultUri" should "ignore
duplicate inserts" in {
+ val execution = new WorkflowExecutions
+ execution.setVid(testVersion.getVid)
+ execution.setUid(testUser.getUid)
+ execution.setStatus(0.toByte)
+ execution.setResult("")
+ execution.setStartingTime(new Timestamp(System.currentTimeMillis()))
+ execution.setBookmarked(false)
+ execution.setName("Execution with duplicate result URI insert")
+ execution.setEnvironmentVersion("test-env-1.0")
+ workflowExecutionsDao.insert(execution)
+
+ val executionId = ExecutionIdentity(execution.getEid.longValue())
+ val globalPortId = GlobalPortIdentity(
+ PhysicalOpIdentity(OperatorIdentity("operator-1"), "main"),
+ PortIdentity(),
+ input = false
+ )
+ val uri = URI.create("vfs:///test-result")
+
+ WorkflowExecutionsResource.insertOperatorPortResultUri(executionId,
globalPortId, uri)
+ WorkflowExecutionsResource.insertOperatorPortResultUri(executionId,
globalPortId, uri)
+
+ val rows = getDSLContext
+ .selectFrom(OPERATOR_PORT_EXECUTIONS)
+
.where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid))
+
.and(OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString))
+ .fetch()
+
+ assert(rows.size() == 1)
+ assert(rows.get(0).getResultUri == uri.toString)
+ }
+
}