This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 8803d084cd fix(test): isolate e2e suites by unique workflow/execution
id (#5888)
8803d084cd is described below
commit 8803d084cd68fc98c06c782aa7a56450e7ddc9ee
Author: Matthew B. <[email protected]>
AuthorDate: Mon Jun 22 22:30:09 2026 -0700
fix(test): isolate e2e suites by unique workflow/execution id (#5888)
### What changes were proposed in this PR?
- Add `TestUtils.workflowContext(id, settings)` that sets both
`workflowId` and `executionId` to `id`, and make the DB fixtures plus
`setUp`/`cleanupWorkflowExecutionData` take an `id` (the user email is
derived from `id` to avoid the unique-email collision).
- Give each materializing e2e suite a distinct id so concurrent suites
no longer share an Iceberg result keyspace or DB rows:
DataProcessingSpec=1, PauseSpec=2, ReconfigurationSpec=3,
ReconfigurationIntegrationSpec=4.
- Test-only change, no production code; BatchSizePropagationSpec and
CheckpointSpec are untouched because they do not materialize results.
### Any related issues, documentation, discussions?
Closes: #5887
### How was this PR tested?
- `sbt "WorkflowExecutionService/Test/compile"` compiles clean on Java
17.
- Run the previously-flaky suite against the integration services
(Postgres test DB + MinIO/S3 + Iceberg catalog, as in the `build /
amber` CI job): `sbt "WorkflowExecutionService/testOnly
*DataProcessingSpec"`; expect all DataProcessingSpec tests green with no
CommitFailedException flake.
- The timing-dependent flake could not be reproduced locally (no
MinIO/Iceberg env), so final verification is the `build / amber` CI job
staying green across re-runs.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF
---
.../e2e/ReconfigurationIntegrationSpec.scala | 15 ++---
.../amber/engine/e2e/DataProcessingSpec.scala | 13 ++--
.../apache/texera/amber/engine/e2e/PauseSpec.scala | 10 +--
.../amber/engine/e2e/ReconfigurationSpec.scala | 9 +--
.../apache/texera/amber/engine/e2e/TestUtils.scala | 72 ++++++++++++++--------
5 files changed, 74 insertions(+), 45 deletions(-)
diff --git
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
index 6f0936da28..5d2ed7e5e4 100644
---
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
+++
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
@@ -28,7 +28,7 @@ import org.apache.texera.amber.clustering.SingleNodeListener
import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.engine.architecture.controller.{
ControllerConfig,
ExecutionStateUpdate
@@ -80,14 +80,15 @@ class ReconfigurationIntegrationSpec
implicit val timeout: Timeout = Timeout(5.seconds)
val logger = Logger("ReconfigurationIntegrationSpecLogger")
- val ctx = new WorkflowContext()
+ private val specId = 4
+ val ctx = TestUtils.workflowContext(specId)
override protected def beforeEach(): Unit = {
- setUpWorkflowExecutionData()
+ setUpWorkflowExecutionData(specId)
}
override protected def afterEach(): Unit = {
- cleanupWorkflowExecutionData()
+ cleanupWorkflowExecutionData(specId)
}
override def beforeAll(): Unit = {
@@ -117,12 +118,12 @@ class ReconfigurationIntegrationSpec
*/
private def warmupOnce(): Unit = {
val warmupCap = Duration.fromSeconds(10)
- setUpWorkflowExecutionData()
+ setUpWorkflowExecutionData(specId)
var client: AmberClient = null
try {
val src = new TextInputSourceOpDesc()
src.textInput = "warmup"
- val warmupCtx = new WorkflowContext()
+ val warmupCtx = TestUtils.workflowContext(specId)
val workflow = buildWorkflow(List(src), List.empty, warmupCtx)
client = new AmberClient(
system,
@@ -150,7 +151,7 @@ class ReconfigurationIntegrationSpec
try client.shutdown()
catch { case _: Throwable => () }
}
- cleanupWorkflowExecutionData()
+ cleanupWorkflowExecutionData(specId)
}
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index 2606d9d656..6dde4eff9f 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -66,21 +66,24 @@ class DataProcessingSpec
implicit val timeout: Timeout = Timeout(5.seconds)
- val workflowContext: WorkflowContext = new WorkflowContext()
+ private val specId = 1
- val materializedWorkflowContext: WorkflowContext = new WorkflowContext(
- workflowSettings = WorkflowSettings(
+ val workflowContext: WorkflowContext = TestUtils.workflowContext(specId)
+
+ val materializedWorkflowContext: WorkflowContext = TestUtils.workflowContext(
+ specId,
+ WorkflowSettings(
dataTransferBatchSize = 400,
executionMode = ExecutionMode.MATERIALIZED
)
)
override protected def beforeEach(): Unit = {
- setUpWorkflowExecutionData()
+ setUpWorkflowExecutionData(specId)
}
override protected def afterEach(): Unit = {
- cleanupWorkflowExecutionData()
+ cleanupWorkflowExecutionData(specId)
}
override def beforeAll(): Unit = {
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
index 2cc268608f..036caefa0a 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
@@ -25,7 +25,7 @@ import org.apache.pekko.util.Timeout
import com.twitter.util.{Await, Duration, Promise}
import com.typesafe.scalalogging.Logger
import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.engine.architecture.controller.{
ControllerConfig,
ExecutionStateUpdate
@@ -70,12 +70,14 @@ class PauseSpec
val logger = Logger("PauseSpecLogger")
+ private val specId = 2
+
override protected def beforeEach(): Unit = {
- setUpWorkflowExecutionData()
+ setUpWorkflowExecutionData(specId)
}
override protected def afterEach(): Unit = {
- cleanupWorkflowExecutionData()
+ cleanupWorkflowExecutionData(specId)
}
override def beforeAll(): Unit = {
@@ -95,7 +97,7 @@ class PauseSpec
links: List[LogicalLink]
): Unit = {
val workflow =
- TestUtils.buildWorkflow(operators, links, new WorkflowContext())
+ TestUtils.buildWorkflow(operators, links,
TestUtils.workflowContext(specId))
val client =
new AmberClient(
system,
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 2cd3559736..6dfb23f6ac 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -27,7 +27,7 @@ import org.apache.texera.amber.clustering.SingleNodeListener
import org.apache.texera.amber.core.executor.OpExecInitInfo
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.engine.common.AmberRuntime
import org.apache.texera.amber.engine.e2e.TestUtils.{
cleanupWorkflowExecutionData,
@@ -60,14 +60,15 @@ class ReconfigurationSpec
implicit val timeout: Timeout = Timeout(5.seconds)
val logger = Logger("ReconfigurationSpecLogger")
- val ctx = new WorkflowContext()
+ private val specId = 3
+ val ctx = TestUtils.workflowContext(specId)
override protected def beforeEach(): Unit = {
- setUpWorkflowExecutionData()
+ setUpWorkflowExecutionData(specId)
}
override protected def afterEach(): Unit = {
- cleanupWorkflowExecutionData()
+ cleanupWorkflowExecutionData(specId)
}
override def beforeAll(): Unit = {
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index 2a87fe3490..ac71483a5d 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -26,8 +26,12 @@ import org.apache.texera.amber.core.executor.OpExecInitInfo
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.Tuple
-import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
OperatorIdentity}
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.virtualidentity.{
+ ExecutionIdentity,
+ OperatorIdentity,
+ WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext,
WorkflowSettings}
import org.apache.texera.amber.engine.architecture.controller.{
ControllerConfig,
ExecutionStateUpdate,
@@ -66,6 +70,22 @@ import org.apache.texera.workflow.{LogicalLink,
WorkflowCompiler}
object TestUtils {
+ /**
+ * A WorkflowContext whose workflow- and execution-id are both `id`. Each
e2e
+ * suite passes a distinct id so its results land in a disjoint storage
+ * keyspace (`vfs:///wid/{id}/eid/{id}/...`) and disjoint DB rows, letting
the
+ * suites run concurrently without colliding on the shared Iceberg catalog.
+ */
+ def workflowContext(
+ id: Int,
+ workflowSettings: WorkflowSettings = WorkflowSettings()
+ ): WorkflowContext =
+ new WorkflowContext(
+ workflowId = WorkflowIdentity(id.toLong),
+ executionId = ExecutionIdentity(id.toLong),
+ workflowSettings = workflowSettings
+ )
+
def buildWorkflow(
operators: List[LogicalOp],
links: List[LogicalLink],
@@ -186,53 +206,55 @@ object TestUtils {
)
}
- val testUser: User = {
+ // All fixture rows for one suite share `id` as uid/wid/vid/eid; the email is
+ // derived from it so concurrent suites don't collide on the unique email
key.
+ def testUser(id: Int): User = {
val user = new User
- user.setUid(Integer.valueOf(1))
- user.setName("test_user")
+ user.setUid(Integer.valueOf(id))
+ user.setName(s"test_user_$id")
user.setRole(UserRoleEnum.ADMIN)
user.setPassword("123")
- user.setEmail("[email protected]")
+ user.setEmail(s"[email protected]")
user
}
- val testWorkflowEntry: WorkflowPojo = {
+ def testWorkflowEntry(id: Int): WorkflowPojo = {
val workflow = new WorkflowPojo
workflow.setName("test workflow")
- workflow.setWid(Integer.valueOf(1))
+ workflow.setWid(Integer.valueOf(id))
workflow.setContent("test workflow content")
workflow.setDescription("test description")
workflow
}
- val testWorkflowVersionEntry: WorkflowVersion = {
+ def testWorkflowVersionEntry(id: Int): WorkflowVersion = {
val workflowVersion = new WorkflowVersion
- workflowVersion.setWid(Integer.valueOf(1))
- workflowVersion.setVid(Integer.valueOf(1))
+ workflowVersion.setWid(Integer.valueOf(id))
+ workflowVersion.setVid(Integer.valueOf(id))
workflowVersion.setContent("test version content")
workflowVersion
}
- val testWorkflowExecutionEntry: WorkflowExecutions = {
+ def testWorkflowExecutionEntry(id: Int): WorkflowExecutions = {
val workflowExecution = new WorkflowExecutions
- workflowExecution.setEid(Integer.valueOf(1))
- workflowExecution.setVid(Integer.valueOf(1))
- workflowExecution.setUid(Integer.valueOf(1))
+ workflowExecution.setEid(Integer.valueOf(id))
+ workflowExecution.setVid(Integer.valueOf(id))
+ workflowExecution.setUid(Integer.valueOf(id))
workflowExecution.setStatus(3.toByte)
workflowExecution.setEnvironmentVersion("test engine")
workflowExecution
}
- def setUpWorkflowExecutionData(): Unit = {
+ def setUpWorkflowExecutionData(id: Int): Unit = {
val dslConfig = SqlServer.getInstance().context.configuration()
val userDao = new UserDao(dslConfig)
val workflowDao = new WorkflowDao(dslConfig)
val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
val workflowVersionDao = new WorkflowVersionDao(dslConfig)
- userDao.insert(testUser)
- workflowDao.insert(testWorkflowEntry)
- workflowVersionDao.insert(testWorkflowVersionEntry)
- workflowExecutionsDao.insert(testWorkflowExecutionEntry)
+ userDao.insert(testUser(id))
+ workflowDao.insert(testWorkflowEntry(id))
+ workflowVersionDao.insert(testWorkflowVersionEntry(id))
+ workflowExecutionsDao.insert(testWorkflowExecutionEntry(id))
}
/**
@@ -318,16 +340,16 @@ object TestUtils {
result
}
- def cleanupWorkflowExecutionData(): Unit = {
+ def cleanupWorkflowExecutionData(id: Int): Unit = {
val dslConfig = SqlServer.getInstance().context.configuration()
val userDao = new UserDao(dslConfig)
val workflowDao = new WorkflowDao(dslConfig)
val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
val workflowVersionDao = new WorkflowVersionDao(dslConfig)
- workflowExecutionsDao.deleteById(1)
- workflowVersionDao.deleteById(1)
- workflowDao.deleteById(1)
- userDao.deleteById(1)
+ workflowExecutionsDao.deleteById(id)
+ workflowVersionDao.deleteById(id)
+ workflowDao.deleteById(id)
+ userDao.deleteById(id)
}
}