This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new dfa0434e70 refactor(test): extract shared run-and-read e2e harness 
into TestUtils (#5712)
dfa0434e70 is described below

commit dfa0434e70c256feb37a8d38c1d0bc134e789fdd
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Jun 14 17:58:45 2026 -0700

    refactor(test): extract shared run-and-read e2e harness into TestUtils 
(#5712)
    
    ### What changes were proposed in this PR?
    
    Extracts the repeated "run a workflow and read its materialized results"
    boilerplate from the amber e2e specs into two reusable helpers on
    `TestUtils`:
    
    - `readMaterializedResults(executionId, operatorIds, extract)` — resolve
    + open each operator's external RESULT document and apply `extract` to
    the opened `VirtualDocument[Tuple]` (operators with no materialized
    output are skipped).
    - `runWorkflowAndReadResults(system, workflow, operatorIds, extract,
    completionTimeout)` — run a workflow to `COMPLETED` (a `FatalError`
    aborts and surfaces as the awaited exception), then read results via
    `readMaterializedResults`.
    
    `DataProcessingSpec.executeWorkflow` now calls the shared harness
    instead of its own inline copy. The helpers are loop/state-agnostic —
    they only use existing core APIs (`DocumentFactory`,
    `VirtualDocument[Tuple]`, `AmberClient`, `ExecutionStateUpdate`,
    `FatalError`), so other e2e specs can adopt them too.
    
    ### Any related issues, documentation, discussions?
    
    Resolves #5711 (sub-issue of #4442 "Introduce for loop"). Split out of
    #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
    
[review](https://github.com/apache/texera/pull/4206#pullrequestreview-4482667715).
    
    ### How was this PR tested?
    
    Behavior-preserving refactor of existing e2e test infrastructure.
    `WorkflowExecutionService/Test/compile` and
    `WorkflowExecutionService/scalafmtCheckAll` pass locally. The
    `@IntegrationTest` specs that exercise the harness (e.g.
    `DataProcessingSpec`) run in CI — they spawn Python workers and can't
    run on Windows.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Co-authored with Claude Opus 4.8 in compliance with ASF.
---
 .../amber/engine/e2e/DataProcessingSpec.scala      |  58 +---------
 .../apache/texera/amber/engine/e2e/TestUtils.scala | 123 ++++++++++++++++-----
 2 files changed, 101 insertions(+), 80 deletions(-)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index d070fefb27..2606d9d656 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -22,10 +22,7 @@ package org.apache.texera.amber.engine.e2e
 import org.apache.pekko.actor.{ActorSystem, Props}
 import org.apache.pekko.testkit.{ImplicitSender, TestKit}
 import org.apache.pekko.util.Timeout
-import com.twitter.util.{Await, Duration, Promise}
 import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.{AttributeType, Tuple}
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.{
@@ -35,19 +32,16 @@ import org.apache.texera.amber.core.workflow.{
   WorkflowSettings
 }
 import org.apache.texera.amber.engine.architecture.controller._
-import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
 import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.engine.e2e.TestUtils.{
   buildWorkflow,
   cleanupWorkflowExecutionData,
   initiateTexeraDBForTestCases,
+  runWorkflowAndReadTerminalResults,
   setUpWorkflowExecutionData
 }
 import org.apache.texera.amber.operator.TestOperators
 import org.apache.texera.amber.operator.aggregate.AggregationFunction
-import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
 import org.apache.texera.workflow.LogicalLink
 import org.scalatest.flatspec.AnyFlatSpecLike
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
@@ -101,54 +95,8 @@ class DataProcessingSpec
     TestKit.shutdownActorSystem(system)
   }
 
-  def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] 
= {
-    var results: Map[OperatorIdentity, List[Tuple]] = null
-    val client = new AmberClient(
-      system,
-      workflow.context,
-      workflow.physicalPlan,
-      ControllerConfig.default,
-      error => {}
-    )
-    val completion = Promise[Unit]()
-    client.registerCallback[FatalError](evt => {
-      completion.setException(evt.e)
-      client.shutdown()
-    })
-
-    client
-      .registerCallback[ExecutionStateUpdate](evt => {
-        if (evt.state == COMPLETED) {
-          results = workflow.logicalPlan.getTerminalOperatorIds
-            .filter(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflowContext.executionId,
-                terminalOpId,
-                PortIdentity()
-              )
-              uri.nonEmpty
-            })
-            .map(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflowContext.executionId,
-                terminalOpId,
-                PortIdentity()
-              ).get
-              terminalOpId -> DocumentFactory
-                .openDocument(uri)
-                ._1
-                .asInstanceOf[VirtualDocument[Tuple]]
-                .get()
-                .toList
-            })
-            .toMap
-          completion.setDone()
-        }
-      })
-    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
-    Await.result(completion, Duration.fromMinutes(1))
-    results
-  }
+  def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] =
+    runWorkflowAndReadTerminalResults(system, workflow)
 
   "Engine" should "execute headerlessCsv workflow normally" in {
     val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc()
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index 9021765fc8..2a87fe3490 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -19,18 +19,19 @@
 
 package org.apache.texera.amber.engine.e2e
 
-import com.twitter.util.{Await, Duration, Promise, Return}
+import com.twitter.util.{Await, Duration, Promise, Return, Throw, Try}
 import org.apache.pekko.actor.ActorSystem
 import org.apache.texera.common.config.StorageConfig
 import org.apache.texera.amber.core.executor.OpExecInitInfo
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
-import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
OperatorIdentity}
 import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerConfig,
   ExecutionStateUpdate,
+  FatalError,
   Workflow
 }
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
@@ -78,6 +79,100 @@ object TestUtils {
     )
   }
 
+  /**
+    * Resolve and read each operator's external RESULT document at 
`executionId`,
+    * applying `extract` to the opened document. Operators with no external
+    * RESULT uri (e.g. one whose output wasn't materialized) are omitted. 
Shared
+    * by the e2e specs so the lookup-open-extract block doesn't drift between
+    * copies.
+    */
+  def readMaterializedResults[T](
+      executionId: ExecutionIdentity,
+      operatorIds: Iterable[OperatorIdentity],
+      extract: VirtualDocument[Tuple] => T
+  ): Map[OperatorIdentity, T] =
+    operatorIds.flatMap { opId =>
+      getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri 
=>
+        opId -> extract(
+          
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
+        )
+      }
+    }.toMap
+
+  /**
+    * Convenience over `readMaterializedResults` for the common case: read each
+    * terminal operator's result of `workflow` as a `List[Tuple]`.
+    */
+  def readMaterializedResults(workflow: Workflow): Map[OperatorIdentity, 
List[Tuple]] =
+    readMaterializedResults(
+      workflow.context.executionId,
+      workflow.logicalPlan.getTerminalOperatorIds,
+      _.get().toList
+    )
+
+  /**
+    * Run `workflow` to COMPLETED, then read the requested operators' 
materialized
+    * results via `readMaterializedResults`. A FatalError aborts the run and is
+    * surfaced as the exception from the completion await. Specs that drive the
+    * run differently (e.g. a pause/resume flow) read results directly inside
+    * their own completion callback instead.
+    */
+  def runWorkflowAndReadResults[T](
+      system: ActorSystem,
+      workflow: Workflow,
+      operatorIds: Iterable[OperatorIdentity],
+      extract: VirtualDocument[Tuple] => T,
+      completionTimeout: Duration = Duration.fromMinutes(1)
+  ): Map[OperatorIdentity, T] = {
+    // The Promise carries the result so completing the run and handing back 
the
+    // value are atomic. Every terminal path uses `updateIfEmpty`, so a second
+    // event (a late FatalError after COMPLETED, or a repeated state update)
+    // can't throw inside a callback and get swallowed -- which would otherwise
+    // mask the real failure as a timeout. A read failure inside the COMPLETED
+    // callback fails the Promise (via `Try`) instead of hanging, and
+    // `shutdown()` runs in a `finally` so a timeout or error can't leak the
+    // client's actors.
+    val completion = Promise[Map[OperatorIdentity, T]]()
+    val client = new AmberClient(
+      system,
+      workflow.context,
+      workflow.physicalPlan,
+      ControllerConfig.default,
+      e => completion.updateIfEmpty(Throw(e))
+    )
+    try {
+      client.registerCallback[FatalError](evt => 
completion.updateIfEmpty(Throw(evt.e)))
+      client.registerCallback[ExecutionStateUpdate](evt => {
+        if (evt.state == COMPLETED) {
+          completion.updateIfEmpty(
+            Try(readMaterializedResults(workflow.context.executionId, 
operatorIds, extract))
+          )
+        }
+      })
+      Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), 
()))
+      Await.result(completion, completionTimeout)
+    } finally {
+      client.shutdown()
+    }
+  }
+
+  /**
+    * Convenience over `runWorkflowAndReadResults` for the common case: run
+    * `workflow` and read each terminal operator's result as a `List[Tuple]`.
+    */
+  def runWorkflowAndReadTerminalResults(
+      system: ActorSystem,
+      workflow: Workflow,
+      completionTimeout: Duration = Duration.fromMinutes(1)
+  ): Map[OperatorIdentity, List[Tuple]] =
+    runWorkflowAndReadResults(
+      system,
+      workflow,
+      workflow.logicalPlan.getTerminalOperatorIds,
+      _.get().toList,
+      completionTimeout
+    )
+
   /**
     * If a test case accesses the user system through singleton resources that 
cache the DSLContext (e.g., executes a
     * workflow, which accesses WorkflowExecutionsResource), we use a separate 
texera_db specifically for such test cases.
@@ -188,29 +283,7 @@ object TestUtils {
     var result: Map[OperatorIdentity, List[Tuple]] = null
     client.registerCallback[ExecutionStateUpdate](evt => {
       if (evt.state == COMPLETED) {
-        result = workflow.logicalPlan.getTerminalOperatorIds
-          .filter(terminalOpId => {
-            val uri = getResultUriByLogicalPortId(
-              workflow.context.executionId,
-              terminalOpId,
-              PortIdentity()
-            )
-            uri.nonEmpty
-          })
-          .map(terminalOpId => {
-            val uri = getResultUriByLogicalPortId(
-              workflow.context.executionId,
-              terminalOpId,
-              PortIdentity()
-            ).get
-            terminalOpId -> DocumentFactory
-              .openDocument(uri)
-              ._1
-              .asInstanceOf[VirtualDocument[Tuple]]
-              .get()
-              .toList
-          })
-          .toMap
+        result = readMaterializedResults(workflow)
         completion.setDone()
       }
     })

Reply via email to