This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new 304493066d refactor(test): extract shared run-and-read e2e harness
into TestUtils
304493066d is described below
commit 304493066ded6f130a61c59b8ce4b47eea4b633b
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 13 15:52:41 2026 -0700
refactor(test): extract shared run-and-read e2e harness into TestUtils
The "run a workflow to COMPLETED, then read each operator's materialized
RESULT document" skeleton was copy-pasted three times: DataProcessingSpec
.executeWorkflow, TestUtils.shouldReconfigure, and LoopIntegrationSpec's
materialized-row-count helper.
Extract two shared helpers in TestUtils:
- readMaterializedResults(executionId, operatorIds, extract): resolve each
operator's external RESULT uri, open the document, apply extract; omit
operators with no uri.
- runWorkflowAndReadResults(system, workflow, operatorIds, extract,
timeout):
run to COMPLETED (FatalError aborts via the completion await), then read.
Route all three callers through them:
- DataProcessingSpec.executeWorkflow -> runWorkflowAndReadResults(...,
_.get().toList)
- LoopIntegrationSpec -> runWorkflowAndReadResults(..., _.getCount) (keyed
by
OperatorIdentity now, not the id string)
- shouldReconfigure -> readMaterializedResults for its read block (it keeps
its own pause/reconfigure/resume run loop)
Drops the now-dead imports from both specs. Net ~50 fewer lines and one copy
of the harness instead of three.
Verified locally: DataProcessingSpec + ReconfigurationSpec (18 tests) pass
via
the shared helpers; LoopIntegrationSpec uses the identical harness and is
verified in the amber-integration CI job.
---
.../amber/engine/e2e/LoopIntegrationSpec.scala | 78 ++++---------------
.../amber/engine/e2e/DataProcessingSpec.scala | 59 ++------------
.../apache/texera/amber/engine/e2e/TestUtils.scala | 91 ++++++++++++++++------
3 files changed, 89 insertions(+), 139 deletions(-)
diff --git
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala
index 58c2d01aa2..13933ab951 100644
---
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala
+++
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala
@@ -19,30 +19,24 @@
package org.apache.texera.amber.engine.e2e
-import com.twitter.util.{Await, Duration, Promise}
+import com.twitter.util.Duration
import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
import org.apache.pekko.util.Timeout
import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
-import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.{
ExecutionMode,
PortIdentity,
WorkflowContext,
WorkflowSettings
}
-import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
-import org.apache.texera.amber.engine.architecture.controller._
-import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
-import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.e2e.TestUtils.{
buildWorkflow,
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
+ runWorkflowAndReadResults,
setUpWorkflowExecutionData
}
import org.apache.texera.amber.operator.LogicalOp
@@ -121,62 +115,22 @@ class LoopIntegrationSpec
)
/**
- * Run the loop workflow to completion and return each operator's
- * materialized RESULT-table row count, keyed by logical op id.
- *
- * Read inside the `COMPLETED` callback while the engine and storage
- * singletons are still alive (the same point `DataProcessingSpec` reads its
- * results), looking up each operator's external RESULT uri the way the
- * frontend does. Operators whose uri is absent, or whose document can't be
- * opened, are omitted (tolerated, not fatal). Failure messages below
- * include the full map so an unexpected count is visible.
+ * Run the loop workflow to completion and return each operator's
materialized
+ * RESULT-table row count, keyed by operator id. Delegates to the shared
+ * `TestUtils.runWorkflowAndReadResults` harness (a correct loop terminates
+ * within the 3-minute deadline; a broken one hangs until it).
*/
private def runAndGetMaterializedRowCounts(
operators: List[LogicalOp],
links: List[LogicalLink]
- ): Map[String, Long] = {
- val workflow = buildWorkflow(operators, links, materializedContext())
- val eid = workflow.context.executionId
- val client = new AmberClient(
+ ): Map[OperatorIdentity, Long] =
+ runWorkflowAndReadResults(
system,
- workflow.context,
- workflow.physicalPlan,
- ControllerConfig.default,
- _ => {}
+ buildWorkflow(operators, links, materializedContext()),
+ operators.map(_.operatorIdentifier),
+ _.getCount,
+ Duration.fromMinutes(3)
)
- val completion = Promise[Unit]()
- var materializedCounts: Map[String, Long] = Map.empty
- client.registerCallback[FatalError](evt => {
- completion.setException(evt.e)
- client.shutdown()
- })
- client.registerCallback[ExecutionStateUpdate](evt => {
- if (evt.state == COMPLETED) {
- materializedCounts = operators.flatMap { op =>
- WorkflowExecutionsResource
- .getResultUriByLogicalPortId(eid, op.operatorIdentifier,
PortIdentity())
- .flatMap { uri =>
- scala.util
- .Try(
- DocumentFactory
- .openDocument(uri)
- ._1
- .asInstanceOf[VirtualDocument[Tuple]]
- .getCount
- )
- .toOption
- .map(count => op.operatorIdentifier.id -> count)
- }
- }.toMap
- completion.setDone()
- }
- })
- Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
- // A correct loop terminates; a broken one hangs until this deadline.
- Await.result(completion, Duration.fromMinutes(3))
- client.shutdown()
- materializedCounts
- }
private def textInput(text: String): TextInputSourceOpDesc = {
val op = new TextInputSourceOpDesc()
@@ -214,7 +168,7 @@ class LoopIntegrationSpec
// (outermost) LoopEnd is an identity pass-through that accumulates every
// iteration and never resets, so its materialized result holds all 3 rows.
// An off-by-one counter bug that still terminated would land on 2 or 4.
- val endRows = materialized.getOrElse(end.operatorIdentifier.id, -1L)
+ val endRows = materialized.getOrElse(end.operatorIdentifier, -1L)
assert(
endRows == 3,
s"single LoopEnd must accumulate all 3 iterations in its materialized " +
@@ -257,8 +211,8 @@ class LoopIntegrationSpec
// result holds only the final outer iteration's 3 inner rows -- not 9.
// The inner == 3 assertion is the one that fails against the pre-fix code
// (where the inner reset never fired and it accumulated all 9).
- val outerRows = materialized.getOrElse(outerEnd.operatorIdentifier.id, -1L)
- val innerRows = materialized.getOrElse(innerEnd.operatorIdentifier.id, -1L)
+ val outerRows = materialized.getOrElse(outerEnd.operatorIdentifier, -1L)
+ val innerRows = materialized.getOrElse(innerEnd.operatorIdentifier, -1L)
assert(
outerRows == 9,
s"outer LoopEnd must accumulate all 9 inner-iteration rows: " +
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index d070fefb27..7a3d1022d8 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -22,10 +22,7 @@ package org.apache.texera.amber.engine.e2e
import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
import org.apache.pekko.util.Timeout
-import com.twitter.util.{Await, Duration, Promise}
import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.{AttributeType, Tuple}
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.{
@@ -35,19 +32,16 @@ import org.apache.texera.amber.core.workflow.{
WorkflowSettings
}
import org.apache.texera.amber.engine.architecture.controller._
-import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
-import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.e2e.TestUtils.{
buildWorkflow,
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
+ runWorkflowAndReadResults,
setUpWorkflowExecutionData
}
import org.apache.texera.amber.operator.TestOperators
import org.apache.texera.amber.operator.aggregate.AggregationFunction
-import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
import org.apache.texera.workflow.LogicalLink
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
@@ -101,54 +95,13 @@ class DataProcessingSpec
TestKit.shutdownActorSystem(system)
}
- def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]]
= {
- var results: Map[OperatorIdentity, List[Tuple]] = null
- val client = new AmberClient(
+ def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] =
+ runWorkflowAndReadResults(
system,
- workflow.context,
- workflow.physicalPlan,
- ControllerConfig.default,
- error => {}
+ workflow,
+ workflow.logicalPlan.getTerminalOperatorIds,
+ _.get().toList
)
- val completion = Promise[Unit]()
- client.registerCallback[FatalError](evt => {
- completion.setException(evt.e)
- client.shutdown()
- })
-
- client
- .registerCallback[ExecutionStateUpdate](evt => {
- if (evt.state == COMPLETED) {
- results = workflow.logicalPlan.getTerminalOperatorIds
- .filter(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflowContext.executionId,
- terminalOpId,
- PortIdentity()
- )
- uri.nonEmpty
- })
- .map(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflowContext.executionId,
- terminalOpId,
- PortIdentity()
- ).get
- terminalOpId -> DocumentFactory
- .openDocument(uri)
- ._1
- .asInstanceOf[VirtualDocument[Tuple]]
- .get()
- .toList
- })
- .toMap
- completion.setDone()
- }
- })
- Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
- Await.result(completion, Duration.fromMinutes(1))
- results
- }
"Engine" should "execute headerlessCsv workflow normally" in {
val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc()
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index bcc43b396b..a235f82e80 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -26,11 +26,12 @@ import org.apache.texera.amber.core.executor.OpExecInitInfo
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.Tuple
-import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
OperatorIdentity}
import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
import org.apache.texera.amber.engine.architecture.controller.{
ControllerConfig,
ExecutionStateUpdate,
+ FatalError,
Workflow
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
@@ -78,6 +79,66 @@ object TestUtils {
)
}
+ /**
+ * Resolve and read each operator's external RESULT document at
`executionId`,
+ * applying `extract` to the opened document. Operators with no external
+ * RESULT uri (e.g. one whose output wasn't materialized) are omitted.
Shared
+ * by the e2e specs so the lookup-open-extract block doesn't drift between
+ * copies.
+ */
+ def readMaterializedResults[T](
+ executionId: ExecutionIdentity,
+ operatorIds: Iterable[OperatorIdentity],
+ extract: VirtualDocument[Tuple] => T
+ ): Map[OperatorIdentity, T] =
+ operatorIds.flatMap { opId =>
+ getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri
=>
+ opId -> extract(
+
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
+ )
+ }
+ }.toMap
+
+ /**
+ * Run `workflow` to COMPLETED, then read the requested operators'
materialized
+ * results via `readMaterializedResults`. A FatalError aborts the run and is
+ * surfaced as the exception from the completion await. Shared by the simple
+ * "run and read" e2e specs (e.g. DataProcessingSpec, LoopIntegrationSpec);
+ * specs that drive the run differently (e.g. reconfiguration's
pause/resume)
+ * call `readMaterializedResults` directly inside their own completion
callback.
+ */
+ def runWorkflowAndReadResults[T](
+ system: ActorSystem,
+ workflow: Workflow,
+ operatorIds: Iterable[OperatorIdentity],
+ extract: VirtualDocument[Tuple] => T,
+ completionTimeout: Duration = Duration.fromMinutes(1)
+ ): Map[OperatorIdentity, T] = {
+ val client = new AmberClient(
+ system,
+ workflow.context,
+ workflow.physicalPlan,
+ ControllerConfig.default,
+ _ => {}
+ )
+ val completion = Promise[Unit]()
+ var results: Map[OperatorIdentity, T] = Map.empty
+ client.registerCallback[FatalError](evt => {
+ completion.setException(evt.e)
+ client.shutdown()
+ })
+ client.registerCallback[ExecutionStateUpdate](evt => {
+ if (evt.state == COMPLETED) {
+ results = readMaterializedResults(workflow.context.executionId,
operatorIds, extract)
+ completion.setDone()
+ }
+ })
+ Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+ Await.result(completion, completionTimeout)
+ client.shutdown()
+ results
+ }
+
/**
* If a test case accesses the user system through singleton resources that
cache the DSLContext (e.g., executes a
* workflow, which accesses WorkflowExecutionsResource), we use a separate
texera_db specifically for such test cases.
@@ -188,29 +249,11 @@ object TestUtils {
var result: Map[OperatorIdentity, List[Tuple]] = null
client.registerCallback[ExecutionStateUpdate](evt => {
if (evt.state == COMPLETED) {
- result = workflow.logicalPlan.getTerminalOperatorIds
- .filter(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflow.context.executionId,
- terminalOpId,
- PortIdentity()
- )
- uri.nonEmpty
- })
- .map(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflow.context.executionId,
- terminalOpId,
- PortIdentity()
- ).get
- terminalOpId -> DocumentFactory
- .openDocument(uri)
- ._1
- .asInstanceOf[VirtualDocument[Tuple]]
- .get()
- .toList
- })
- .toMap
+ result = readMaterializedResults(
+ workflow.context.executionId,
+ workflow.logicalPlan.getTerminalOperatorIds,
+ _.get().toList
+ )
completion.setDone()
}
})