This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 635f0cf88c test(amber): warm up ReconfigurationIntegrationSpec before
timed assertions (#4947)
635f0cf88c is described below
commit 635f0cf88c009b06882720638a406b2ffd80a98c
Author: Yicong Huang <[email protected]>
AuthorDate: Tue May 5 17:25:31 2026 -0700
test(amber): warm up ReconfigurationIntegrationSpec before timed assertions
(#4947)
### What changes were proposed in this PR?
Add a `warmupOnce()` step to `ReconfigurationIntegrationSpec.beforeAll`
that runs a trivial pure-Scala `TextInputSourceOpDesc` workflow before
the timed test cases. This pays the JVM JIT + class-loading + pekko
first-touch bill before the first
`Await.result(client.controllerInterface.startWorkflow(...), 5.seconds)`
runs in `TestUtils.shouldReconfigure`, which is where the suite has been
intermittently timing out.
The warmup is pure Scala (no Python), hard-capped at 10 seconds, and
defensively wrapped: any exception is logged and swallowed,
`client.shutdown()` and `cleanupWorkflowExecutionData()` always run, so
warmup itself can never hang the suite. The existing `Retries` mixin
still backs up individual test cases.
### Any related issues, documentation, discussions?
Closes #4946.
### How was this PR tested?
`sbt WorkflowExecutionService/Test/compile` clean; `sbt
WorkflowExecutionService/scalafmtCheckAll` clean. CI `amber-integration`
job is the real test surface.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7 (Claude Code)
---
.../e2e/ReconfigurationIntegrationSpec.scala | 62 ++++++++++++++++++++++
1 file changed, 62 insertions(+)
diff --git
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
index 768caf079f..acadb9154c 100644
---
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
+++
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
@@ -19,6 +19,7 @@
package org.apache.texera.amber.engine.e2e
+import com.twitter.util.{Await, Duration, Promise, Return}
import com.typesafe.scalalogging.Logger
import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
@@ -28,12 +29,21 @@ import
org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.engine.architecture.controller.{
+ ControllerConfig,
+ ExecutionStateUpdate
+}
+import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.e2e.TestUtils.{
+ buildWorkflow,
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
setUpWorkflowExecutionData
}
+import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
import org.apache.texera.amber.tags.IntegrationTest
import org.apache.texera.workflow.LogicalLink
@@ -86,12 +96,64 @@ class ReconfigurationIntegrationSpec
// Explicitly load the JDBC driver to avoid flaky CI failures.
Class.forName("org.postgresql.Driver")
initiateTexeraDBForTestCases()
+ warmupOnce()
}
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
+ /**
+ * Run a trivial pure-Scala workflow (TextInput → terminal) once before the
+ * timed tests start, so the first 5-second `startWorkflow` await in
+ * [[TestUtils.shouldReconfigure]] doesn't have to absorb JVM JIT
+ * warmup, pekko dispatcher first-touch, and `RegionExecutionCoordinator`
+ * class loading.
+ *
+ * Hard-capped at 10 seconds total, defensively wrapped: if warmup itself
+ * times out or throws, log and continue — the existing `Retries` mixin
+ * still backs up individual test cases. This ensures warmup can never
+ * hang the suite.
+ */
+ private def warmupOnce(): Unit = {
+ val warmupCap = Duration.fromSeconds(10)
+ setUpWorkflowExecutionData()
+ var client: AmberClient = null
+ try {
+ val src = new TextInputSourceOpDesc()
+ src.textInput = "warmup"
+ val warmupCtx = new WorkflowContext()
+ val workflow = buildWorkflow(List(src), List.empty, warmupCtx)
+ client = new AmberClient(
+ system,
+ workflow.context,
+ workflow.physicalPlan,
+ ControllerConfig.default,
+ _ => {}
+ )
+ val completion = Promise[Unit]()
+ client.registerCallback[ExecutionStateUpdate](evt => {
+ if (evt.state == COMPLETED) completion.updateIfEmpty(Return(()))
+ })
+ Await.result(
+ client.controllerInterface.startWorkflow(EmptyRequest(), ()),
+ warmupCap
+ )
+ Await.result(completion, warmupCap)
+ } catch {
+ case e: Throwable =>
+ logger.warn(
+ s"warmup workflow did not finish within ${warmupCap}; tests will run
cold and rely on Retries: ${e.getMessage}"
+ )
+ } finally {
+ if (client != null) {
+ try client.shutdown()
+ catch { case _: Throwable => () }
+ }
+ cleanupWorkflowExecutionData()
+ }
+ }
+
// Thin wrapper around the shared TestUtils helper so call sites below stay
// ctx/system-implicit. The actual workflow-driver logic lives in TestUtils
// and is reused by ReconfigurationSpec.