This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.2 by this push:
new 24f71f6b04 fix(test): isolate e2e suites by unique workflow/execution
id [release/v1.2 backport] (#5903)
24f71f6b04 is described below
commit 24f71f6b048d41612815791aee6b73ee6deeff05
Author: Matthew B. <[email protected]>
AuthorDate: Tue Jun 23 09:54:19 2026 -0700
fix(test): isolate e2e suites by unique workflow/execution id [release/v1.2
backport] (#5903)
### What changes were proposed in this PR?
- Backport of #5888 to `release/v1.2`, applied as `git cherry-pick -x
8803d084c`.
- Each amber e2e suite now gets a distinct id via a new
`TestUtils.workflowContext(id, ...)` helper (DataProcessingSpec=1,
PauseSpec=2, ReconfigurationSpec=3, ReconfigurationIntegrationSpec=4),
and the DB fixture rows (user/workflow/version/execution, plus a per-id
email) are derived from that id, so concurrent suites no longer collide
in the shared Iceberg keyspace.
- One conflict resolved, in `TestUtils.scala` imports only: kept
release/v1.2's `org.apache.texera.amber.config.StorageConfig` path and
took the cherry-pick's added
`ExecutionIdentity`/`WorkflowIdentity`/`WorkflowSettings` imports needed
by the new helper. The four spec files applied cleanly.
- The `runWorkflowAndReadTerminalResults`/`readMaterializedResults`
helpers that exist on `main` came from a separate change not on
`release/v1.2`; #5888 does not touch them, so they are correctly absent
here.
### Any related issues, documentation, discussions?
Related to #5887
### How was this PR tested?
- Behavior is unchanged from #5888 (test-only isolation), so it relies
on the existing e2e suites carried over verbatim. Reviewer can run them
in the integration job: `sbt "amber/IntegrationTest/testOnly
*ReconfigurationIntegrationSpec"`, and the unit e2e suites via `sbt
"amber/testOnly *DataProcessingSpec *PauseSpec *ReconfigurationSpec"`;
expect all green with no Iceberg `CommitFailedException` keyspace
collisions across concurrently-running suites.
- Local `sbt` could not load on `release/v1.2` (its `project/` lacks
`AddMetaInfLicenseFiles`, unrelated to this change), so full
compile/test is left to CI on this PR.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF
---
.../e2e/ReconfigurationIntegrationSpec.scala | 15 ++---
.../amber/engine/e2e/DataProcessingSpec.scala | 13 ++--
.../apache/texera/amber/engine/e2e/PauseSpec.scala | 10 +--
.../amber/engine/e2e/ReconfigurationSpec.scala | 9 +--
.../apache/texera/amber/engine/e2e/TestUtils.scala | 72 ++++++++++++++--------
5 files changed, 74 insertions(+), 45 deletions(-)
diff --git
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
index 6f0936da28..5d2ed7e5e4 100644
---
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
+++
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
@@ -28,7 +28,7 @@ import org.apache.texera.amber.clustering.SingleNodeListener
import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.engine.architecture.controller.{
ControllerConfig,
ExecutionStateUpdate
@@ -80,14 +80,15 @@ class ReconfigurationIntegrationSpec
implicit val timeout: Timeout = Timeout(5.seconds)
val logger = Logger("ReconfigurationIntegrationSpecLogger")
- val ctx = new WorkflowContext()
+ private val specId = 4
+ val ctx = TestUtils.workflowContext(specId)
override protected def beforeEach(): Unit = {
- setUpWorkflowExecutionData()
+ setUpWorkflowExecutionData(specId)
}
override protected def afterEach(): Unit = {
- cleanupWorkflowExecutionData()
+ cleanupWorkflowExecutionData(specId)
}
override def beforeAll(): Unit = {
@@ -117,12 +118,12 @@ class ReconfigurationIntegrationSpec
*/
private def warmupOnce(): Unit = {
val warmupCap = Duration.fromSeconds(10)
- setUpWorkflowExecutionData()
+ setUpWorkflowExecutionData(specId)
var client: AmberClient = null
try {
val src = new TextInputSourceOpDesc()
src.textInput = "warmup"
- val warmupCtx = new WorkflowContext()
+ val warmupCtx = TestUtils.workflowContext(specId)
val workflow = buildWorkflow(List(src), List.empty, warmupCtx)
client = new AmberClient(
system,
@@ -150,7 +151,7 @@ class ReconfigurationIntegrationSpec
try client.shutdown()
catch { case _: Throwable => () }
}
- cleanupWorkflowExecutionData()
+ cleanupWorkflowExecutionData(specId)
}
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index d070fefb27..86e2c5c759 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -72,21 +72,24 @@ class DataProcessingSpec
implicit val timeout: Timeout = Timeout(5.seconds)
- val workflowContext: WorkflowContext = new WorkflowContext()
+ private val specId = 1
- val materializedWorkflowContext: WorkflowContext = new WorkflowContext(
- workflowSettings = WorkflowSettings(
+ val workflowContext: WorkflowContext = TestUtils.workflowContext(specId)
+
+ val materializedWorkflowContext: WorkflowContext = TestUtils.workflowContext(
+ specId,
+ WorkflowSettings(
dataTransferBatchSize = 400,
executionMode = ExecutionMode.MATERIALIZED
)
)
override protected def beforeEach(): Unit = {
- setUpWorkflowExecutionData()
+ setUpWorkflowExecutionData(specId)
}
override protected def afterEach(): Unit = {
- cleanupWorkflowExecutionData()
+ cleanupWorkflowExecutionData(specId)
}
override def beforeAll(): Unit = {
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
index 2cc268608f..036caefa0a 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
@@ -25,7 +25,7 @@ import org.apache.pekko.util.Timeout
import com.twitter.util.{Await, Duration, Promise}
import com.typesafe.scalalogging.Logger
import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.engine.architecture.controller.{
ControllerConfig,
ExecutionStateUpdate
@@ -70,12 +70,14 @@ class PauseSpec
val logger = Logger("PauseSpecLogger")
+ private val specId = 2
+
override protected def beforeEach(): Unit = {
- setUpWorkflowExecutionData()
+ setUpWorkflowExecutionData(specId)
}
override protected def afterEach(): Unit = {
- cleanupWorkflowExecutionData()
+ cleanupWorkflowExecutionData(specId)
}
override def beforeAll(): Unit = {
@@ -95,7 +97,7 @@ class PauseSpec
links: List[LogicalLink]
): Unit = {
val workflow =
- TestUtils.buildWorkflow(operators, links, new WorkflowContext())
+ TestUtils.buildWorkflow(operators, links,
TestUtils.workflowContext(specId))
val client =
new AmberClient(
system,
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 2cd3559736..6dfb23f6ac 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -27,7 +27,7 @@ import org.apache.texera.amber.clustering.SingleNodeListener
import org.apache.texera.amber.core.executor.OpExecInitInfo
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.engine.common.AmberRuntime
import org.apache.texera.amber.engine.e2e.TestUtils.{
cleanupWorkflowExecutionData,
@@ -60,14 +60,15 @@ class ReconfigurationSpec
implicit val timeout: Timeout = Timeout(5.seconds)
val logger = Logger("ReconfigurationSpecLogger")
- val ctx = new WorkflowContext()
+ private val specId = 3
+ val ctx = TestUtils.workflowContext(specId)
override protected def beforeEach(): Unit = {
- setUpWorkflowExecutionData()
+ setUpWorkflowExecutionData(specId)
}
override protected def afterEach(): Unit = {
- cleanupWorkflowExecutionData()
+ cleanupWorkflowExecutionData(specId)
}
override def beforeAll(): Unit = {
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index bcc43b396b..113de090db 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -26,8 +26,12 @@ import org.apache.texera.amber.core.executor.OpExecInitInfo
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.Tuple
-import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.virtualidentity.{
+ ExecutionIdentity,
+ OperatorIdentity,
+ WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext,
WorkflowSettings}
import org.apache.texera.amber.engine.architecture.controller.{
ControllerConfig,
ExecutionStateUpdate,
@@ -65,6 +69,22 @@ import org.apache.texera.workflow.{LogicalLink,
WorkflowCompiler}
object TestUtils {
+ /**
+ * A WorkflowContext whose workflow- and execution-id are both `id`. Each
e2e
+ * suite passes a distinct id so its results land in a disjoint storage
+ * keyspace (`vfs:///wid/{id}/eid/{id}/...`) and disjoint DB rows, letting
the
+ * suites run concurrently without colliding on the shared Iceberg catalog.
+ */
+ def workflowContext(
+ id: Int,
+ workflowSettings: WorkflowSettings = WorkflowSettings()
+ ): WorkflowContext =
+ new WorkflowContext(
+ workflowId = WorkflowIdentity(id.toLong),
+ executionId = ExecutionIdentity(id.toLong),
+ workflowSettings = workflowSettings
+ )
+
def buildWorkflow(
operators: List[LogicalOp],
links: List[LogicalLink],
@@ -91,53 +111,55 @@ object TestUtils {
)
}
- val testUser: User = {
+ // All fixture rows for one suite share `id` as uid/wid/vid/eid; the email is
+ // derived from it so concurrent suites don't collide on the unique email
key.
+ def testUser(id: Int): User = {
val user = new User
- user.setUid(Integer.valueOf(1))
- user.setName("test_user")
+ user.setUid(Integer.valueOf(id))
+ user.setName(s"test_user_$id")
user.setRole(UserRoleEnum.ADMIN)
user.setPassword("123")
- user.setEmail("[email protected]")
+ user.setEmail(s"[email protected]")
user
}
- val testWorkflowEntry: WorkflowPojo = {
+ def testWorkflowEntry(id: Int): WorkflowPojo = {
val workflow = new WorkflowPojo
workflow.setName("test workflow")
- workflow.setWid(Integer.valueOf(1))
+ workflow.setWid(Integer.valueOf(id))
workflow.setContent("test workflow content")
workflow.setDescription("test description")
workflow
}
- val testWorkflowVersionEntry: WorkflowVersion = {
+ def testWorkflowVersionEntry(id: Int): WorkflowVersion = {
val workflowVersion = new WorkflowVersion
- workflowVersion.setWid(Integer.valueOf(1))
- workflowVersion.setVid(Integer.valueOf(1))
+ workflowVersion.setWid(Integer.valueOf(id))
+ workflowVersion.setVid(Integer.valueOf(id))
workflowVersion.setContent("test version content")
workflowVersion
}
- val testWorkflowExecutionEntry: WorkflowExecutions = {
+ def testWorkflowExecutionEntry(id: Int): WorkflowExecutions = {
val workflowExecution = new WorkflowExecutions
- workflowExecution.setEid(Integer.valueOf(1))
- workflowExecution.setVid(Integer.valueOf(1))
- workflowExecution.setUid(Integer.valueOf(1))
+ workflowExecution.setEid(Integer.valueOf(id))
+ workflowExecution.setVid(Integer.valueOf(id))
+ workflowExecution.setUid(Integer.valueOf(id))
workflowExecution.setStatus(3.toByte)
workflowExecution.setEnvironmentVersion("test engine")
workflowExecution
}
- def setUpWorkflowExecutionData(): Unit = {
+ def setUpWorkflowExecutionData(id: Int): Unit = {
val dslConfig = SqlServer.getInstance().context.configuration()
val userDao = new UserDao(dslConfig)
val workflowDao = new WorkflowDao(dslConfig)
val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
val workflowVersionDao = new WorkflowVersionDao(dslConfig)
- userDao.insert(testUser)
- workflowDao.insert(testWorkflowEntry)
- workflowVersionDao.insert(testWorkflowVersionEntry)
- workflowExecutionsDao.insert(testWorkflowExecutionEntry)
+ userDao.insert(testUser(id))
+ workflowDao.insert(testWorkflowEntry(id))
+ workflowVersionDao.insert(testWorkflowVersionEntry(id))
+ workflowExecutionsDao.insert(testWorkflowExecutionEntry(id))
}
/**
@@ -245,16 +267,16 @@ object TestUtils {
result
}
- def cleanupWorkflowExecutionData(): Unit = {
+ def cleanupWorkflowExecutionData(id: Int): Unit = {
val dslConfig = SqlServer.getInstance().context.configuration()
val userDao = new UserDao(dslConfig)
val workflowDao = new WorkflowDao(dslConfig)
val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
val workflowVersionDao = new WorkflowVersionDao(dslConfig)
- workflowExecutionsDao.deleteById(1)
- workflowVersionDao.deleteById(1)
- workflowDao.deleteById(1)
- userDao.deleteById(1)
+ workflowExecutionsDao.deleteById(id)
+ workflowVersionDao.deleteById(id)
+ workflowDao.deleteById(id)
+ userDao.deleteById(id)
}
}