This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 304493066d refactor(test): extract shared run-and-read e2e harness 
into TestUtils
304493066d is described below

commit 304493066ded6f130a61c59b8ce4b47eea4b633b
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 13 15:52:41 2026 -0700

    refactor(test): extract shared run-and-read e2e harness into TestUtils
    
    The "run a workflow to COMPLETED, then read each operator's materialized
    RESULT document" skeleton was copy-pasted three times: DataProcessingSpec
    .executeWorkflow, TestUtils.shouldReconfigure, and LoopIntegrationSpec's
    materialized-row-count helper.
    
    Extract two shared helpers in TestUtils:
    - readMaterializedResults(executionId, operatorIds, extract): resolve each
      operator's external RESULT uri, open the document, apply extract; omit
      operators with no uri.
    - runWorkflowAndReadResults(system, workflow, operatorIds, extract, 
timeout):
      run to COMPLETED (FatalError aborts via the completion await), then read.
    
    Route all three callers through them:
    - DataProcessingSpec.executeWorkflow -> runWorkflowAndReadResults(..., 
_.get().toList)
    - LoopIntegrationSpec -> runWorkflowAndReadResults(..., _.getCount) (keyed 
by
      OperatorIdentity now, not the id string)
    - shouldReconfigure -> readMaterializedResults for its read block (it keeps
      its own pause/reconfigure/resume run loop)
    
    Drops the now-dead imports from both specs. Net ~50 fewer lines and one copy
    of the harness instead of three.
    
    Verified locally: DataProcessingSpec + ReconfigurationSpec (18 tests) pass 
via
    the shared helpers; LoopIntegrationSpec uses the identical harness and is
    verified in the amber-integration CI job.
---
 .../amber/engine/e2e/LoopIntegrationSpec.scala     | 78 ++++---------------
 .../amber/engine/e2e/DataProcessingSpec.scala      | 59 ++------------
 .../apache/texera/amber/engine/e2e/TestUtils.scala | 91 ++++++++++++++++------
 3 files changed, 89 insertions(+), 139 deletions(-)

diff --git 
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala
 
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala
index 58c2d01aa2..13933ab951 100644
--- 
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala
+++ 
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala
@@ -19,30 +19,24 @@
 
 package org.apache.texera.amber.engine.e2e
 
-import com.twitter.util.{Await, Duration, Promise}
+import com.twitter.util.Duration
 import org.apache.pekko.actor.{ActorSystem, Props}
 import org.apache.pekko.testkit.{ImplicitSender, TestKit}
 import org.apache.pekko.util.Timeout
 import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
-import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.{
   ExecutionMode,
   PortIdentity,
   WorkflowContext,
   WorkflowSettings
 }
-import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
-import org.apache.texera.amber.engine.architecture.controller._
-import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
 import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.engine.e2e.TestUtils.{
   buildWorkflow,
   cleanupWorkflowExecutionData,
   initiateTexeraDBForTestCases,
+  runWorkflowAndReadResults,
   setUpWorkflowExecutionData
 }
 import org.apache.texera.amber.operator.LogicalOp
@@ -121,62 +115,22 @@ class LoopIntegrationSpec
     )
 
   /**
-    * Run the loop workflow to completion and return each operator's
-    * materialized RESULT-table row count, keyed by logical op id.
-    *
-    * Read inside the `COMPLETED` callback while the engine and storage
-    * singletons are still alive (the same point `DataProcessingSpec` reads its
-    * results), looking up each operator's external RESULT uri the way the
-    * frontend does. Operators whose uri is absent, or whose document can't be
-    * opened, are omitted (tolerated, not fatal). Failure messages below
-    * include the full map so an unexpected count is visible.
+    * Run the loop workflow to completion and return each operator's 
materialized
+    * RESULT-table row count, keyed by operator id. Delegates to the shared
+    * `TestUtils.runWorkflowAndReadResults` harness (a correct loop terminates
+    * within the 3-minute deadline; a broken one hangs until it).
     */
   private def runAndGetMaterializedRowCounts(
       operators: List[LogicalOp],
       links: List[LogicalLink]
-  ): Map[String, Long] = {
-    val workflow = buildWorkflow(operators, links, materializedContext())
-    val eid = workflow.context.executionId
-    val client = new AmberClient(
+  ): Map[OperatorIdentity, Long] =
+    runWorkflowAndReadResults(
       system,
-      workflow.context,
-      workflow.physicalPlan,
-      ControllerConfig.default,
-      _ => {}
+      buildWorkflow(operators, links, materializedContext()),
+      operators.map(_.operatorIdentifier),
+      _.getCount,
+      Duration.fromMinutes(3)
     )
-    val completion = Promise[Unit]()
-    var materializedCounts: Map[String, Long] = Map.empty
-    client.registerCallback[FatalError](evt => {
-      completion.setException(evt.e)
-      client.shutdown()
-    })
-    client.registerCallback[ExecutionStateUpdate](evt => {
-      if (evt.state == COMPLETED) {
-        materializedCounts = operators.flatMap { op =>
-          WorkflowExecutionsResource
-            .getResultUriByLogicalPortId(eid, op.operatorIdentifier, 
PortIdentity())
-            .flatMap { uri =>
-              scala.util
-                .Try(
-                  DocumentFactory
-                    .openDocument(uri)
-                    ._1
-                    .asInstanceOf[VirtualDocument[Tuple]]
-                    .getCount
-                )
-                .toOption
-                .map(count => op.operatorIdentifier.id -> count)
-            }
-        }.toMap
-        completion.setDone()
-      }
-    })
-    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
-    // A correct loop terminates; a broken one hangs until this deadline.
-    Await.result(completion, Duration.fromMinutes(3))
-    client.shutdown()
-    materializedCounts
-  }
 
   private def textInput(text: String): TextInputSourceOpDesc = {
     val op = new TextInputSourceOpDesc()
@@ -214,7 +168,7 @@ class LoopIntegrationSpec
     // (outermost) LoopEnd is an identity pass-through that accumulates every
     // iteration and never resets, so its materialized result holds all 3 rows.
     // An off-by-one counter bug that still terminated would land on 2 or 4.
-    val endRows = materialized.getOrElse(end.operatorIdentifier.id, -1L)
+    val endRows = materialized.getOrElse(end.operatorIdentifier, -1L)
     assert(
       endRows == 3,
       s"single LoopEnd must accumulate all 3 iterations in its materialized " +
@@ -257,8 +211,8 @@ class LoopIntegrationSpec
     // result holds only the final outer iteration's 3 inner rows -- not 9.
     // The inner == 3 assertion is the one that fails against the pre-fix code
     // (where the inner reset never fired and it accumulated all 9).
-    val outerRows = materialized.getOrElse(outerEnd.operatorIdentifier.id, -1L)
-    val innerRows = materialized.getOrElse(innerEnd.operatorIdentifier.id, -1L)
+    val outerRows = materialized.getOrElse(outerEnd.operatorIdentifier, -1L)
+    val innerRows = materialized.getOrElse(innerEnd.operatorIdentifier, -1L)
     assert(
       outerRows == 9,
       s"outer LoopEnd must accumulate all 9 inner-iteration rows: " +
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index d070fefb27..7a3d1022d8 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -22,10 +22,7 @@ package org.apache.texera.amber.engine.e2e
 import org.apache.pekko.actor.{ActorSystem, Props}
 import org.apache.pekko.testkit.{ImplicitSender, TestKit}
 import org.apache.pekko.util.Timeout
-import com.twitter.util.{Await, Duration, Promise}
 import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.{AttributeType, Tuple}
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.{
@@ -35,19 +32,16 @@ import org.apache.texera.amber.core.workflow.{
   WorkflowSettings
 }
 import org.apache.texera.amber.engine.architecture.controller._
-import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
 import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.engine.e2e.TestUtils.{
   buildWorkflow,
   cleanupWorkflowExecutionData,
   initiateTexeraDBForTestCases,
+  runWorkflowAndReadResults,
   setUpWorkflowExecutionData
 }
 import org.apache.texera.amber.operator.TestOperators
 import org.apache.texera.amber.operator.aggregate.AggregationFunction
-import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
 import org.apache.texera.workflow.LogicalLink
 import org.scalatest.flatspec.AnyFlatSpecLike
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
@@ -101,54 +95,13 @@ class DataProcessingSpec
     TestKit.shutdownActorSystem(system)
   }
 
-  def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] 
= {
-    var results: Map[OperatorIdentity, List[Tuple]] = null
-    val client = new AmberClient(
+  def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] =
+    runWorkflowAndReadResults(
       system,
-      workflow.context,
-      workflow.physicalPlan,
-      ControllerConfig.default,
-      error => {}
+      workflow,
+      workflow.logicalPlan.getTerminalOperatorIds,
+      _.get().toList
     )
-    val completion = Promise[Unit]()
-    client.registerCallback[FatalError](evt => {
-      completion.setException(evt.e)
-      client.shutdown()
-    })
-
-    client
-      .registerCallback[ExecutionStateUpdate](evt => {
-        if (evt.state == COMPLETED) {
-          results = workflow.logicalPlan.getTerminalOperatorIds
-            .filter(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflowContext.executionId,
-                terminalOpId,
-                PortIdentity()
-              )
-              uri.nonEmpty
-            })
-            .map(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflowContext.executionId,
-                terminalOpId,
-                PortIdentity()
-              ).get
-              terminalOpId -> DocumentFactory
-                .openDocument(uri)
-                ._1
-                .asInstanceOf[VirtualDocument[Tuple]]
-                .get()
-                .toList
-            })
-            .toMap
-          completion.setDone()
-        }
-      })
-    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
-    Await.result(completion, Duration.fromMinutes(1))
-    results
-  }
 
   "Engine" should "execute headerlessCsv workflow normally" in {
     val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc()
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index bcc43b396b..a235f82e80 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -26,11 +26,12 @@ import org.apache.texera.amber.core.executor.OpExecInitInfo
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
-import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
OperatorIdentity}
 import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerConfig,
   ExecutionStateUpdate,
+  FatalError,
   Workflow
 }
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
@@ -78,6 +79,66 @@ object TestUtils {
     )
   }
 
+  /**
+    * Resolve and read each operator's external RESULT document at 
`executionId`,
+    * applying `extract` to the opened document. Operators with no external
+    * RESULT uri (e.g. one whose output wasn't materialized) are omitted. 
Shared
+    * by the e2e specs so the lookup-open-extract block doesn't drift between
+    * copies.
+    */
+  def readMaterializedResults[T](
+      executionId: ExecutionIdentity,
+      operatorIds: Iterable[OperatorIdentity],
+      extract: VirtualDocument[Tuple] => T
+  ): Map[OperatorIdentity, T] =
+    operatorIds.flatMap { opId =>
+      getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri 
=>
+        opId -> extract(
+          
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
+        )
+      }
+    }.toMap
+
+  /**
+    * Run `workflow` to COMPLETED, then read the requested operators' 
materialized
+    * results via `readMaterializedResults`. A FatalError aborts the run and is
+    * surfaced as the exception from the completion await. Shared by the simple
+    * "run and read" e2e specs (e.g. DataProcessingSpec, LoopIntegrationSpec);
+    * specs that drive the run differently (e.g. reconfiguration's 
pause/resume)
+    * call `readMaterializedResults` directly inside their own completion 
callback.
+    */
+  def runWorkflowAndReadResults[T](
+      system: ActorSystem,
+      workflow: Workflow,
+      operatorIds: Iterable[OperatorIdentity],
+      extract: VirtualDocument[Tuple] => T,
+      completionTimeout: Duration = Duration.fromMinutes(1)
+  ): Map[OperatorIdentity, T] = {
+    val client = new AmberClient(
+      system,
+      workflow.context,
+      workflow.physicalPlan,
+      ControllerConfig.default,
+      _ => {}
+    )
+    val completion = Promise[Unit]()
+    var results: Map[OperatorIdentity, T] = Map.empty
+    client.registerCallback[FatalError](evt => {
+      completion.setException(evt.e)
+      client.shutdown()
+    })
+    client.registerCallback[ExecutionStateUpdate](evt => {
+      if (evt.state == COMPLETED) {
+        results = readMaterializedResults(workflow.context.executionId, 
operatorIds, extract)
+        completion.setDone()
+      }
+    })
+    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+    Await.result(completion, completionTimeout)
+    client.shutdown()
+    results
+  }
+
   /**
     * If a test case accesses the user system through singleton resources that 
cache the DSLContext (e.g., executes a
     * workflow, which accesses WorkflowExecutionsResource), we use a separate 
texera_db specifically for such test cases.
@@ -188,29 +249,11 @@ object TestUtils {
     var result: Map[OperatorIdentity, List[Tuple]] = null
     client.registerCallback[ExecutionStateUpdate](evt => {
       if (evt.state == COMPLETED) {
-        result = workflow.logicalPlan.getTerminalOperatorIds
-          .filter(terminalOpId => {
-            val uri = getResultUriByLogicalPortId(
-              workflow.context.executionId,
-              terminalOpId,
-              PortIdentity()
-            )
-            uri.nonEmpty
-          })
-          .map(terminalOpId => {
-            val uri = getResultUriByLogicalPortId(
-              workflow.context.executionId,
-              terminalOpId,
-              PortIdentity()
-            ).get
-            terminalOpId -> DocumentFactory
-              .openDocument(uri)
-              ._1
-              .asInstanceOf[VirtualDocument[Tuple]]
-              .get()
-              .toList
-          })
-          .toMap
+        result = readMaterializedResults(
+          workflow.context.executionId,
+          workflow.logicalPlan.getTerminalOperatorIds,
+          _.get().toList
+        )
         completion.setDone()
       }
     })

Reply via email to