This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 20cce23fc6 test(amber): add unit test coverage for ExecutionUtils 
(#4570)
20cce23fc6 is described below

commit 20cce23fc651722c104b75a4f791d1c0eb2d28ca
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 16:02:52 2026 -0700

    test(amber): add unit test coverage for ExecutionUtils (#4570)
    
    ### What changes were proposed in this PR?
    
    Add `ExecutionUtilsSpec` covering the three pure aggregator functions
    exposed by `ExecutionUtils`
    
(`amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala`):
    
    - `aggregateStates` — empty, all-completed, all-terminated, has-running,
    all-uninitialized, all-paused, all-ready (maps to RUNNING), and mixed
    (UNKNOWN)
    - `aggregatePortMetrics` — empty, single mapping, sum on same port,
    group by port id when ports differ
    - `aggregateMetrics` — empty defaults, scalar sums and per-port merges
    across operators, and internal-port filtering
    
    ### Any related issues, documentation, discussions?
    
    Closes #4569
    
    ### How was this PR tested?
    
    `sbt "WorkflowExecutionService/testOnly
    
org.apache.texera.amber.engine.architecture.controller.execution.ExecutionUtilsSpec"`
    — 15/15 tests pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../controller/execution/ExecutionUtilsSpec.scala  | 340 +++++++++++++++++++++
 1 file changed, 340 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtilsSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtilsSpec.scala
new file mode 100644
index 0000000000..cf07c22843
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtilsSpec.scala
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.execution
+
+import org.apache.texera.amber.core.workflow.PortIdentity
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.apache.texera.amber.engine.architecture.worker.statistics.{
+  PortTupleMetricsMapping,
+  TupleMetrics
+}
+import org.apache.texera.amber.engine.common.executionruntimestate.{
+  OperatorMetrics,
+  OperatorStatistics
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class ExecutionUtilsSpec extends AnyFlatSpec {
+
+  // Sentinel labels used as the generic T for ExecutionUtils.aggregateStates.
+  private val Completed = "completed"
+  private val Terminated = "terminated"
+  private val Running = "running"
+  private val Uninitialized = "uninitialized"
+  private val Paused = "paused"
+  private val Ready = "ready"
+
+  private def aggregate(states: String*): WorkflowAggregatedState =
+    ExecutionUtils.aggregateStates(
+      states,
+      Completed,
+      Terminated,
+      Running,
+      Uninitialized,
+      Paused,
+      Ready
+    )
+
+  "ExecutionUtils.aggregateStates" should "return UNINITIALIZED for an empty 
input" in {
+    assert(aggregate() == WorkflowAggregatedState.UNINITIALIZED)
+  }
+
+  it should "return COMPLETED when every state is the completed sentinel" in {
+    assert(aggregate(Completed, Completed) == 
WorkflowAggregatedState.COMPLETED)
+  }
+
+  it should "return COMPLETED when every state is the terminated sentinel" in {
+    assert(aggregate(Terminated, Terminated) == 
WorkflowAggregatedState.COMPLETED)
+  }
+
+  it should "return RUNNING when any state is the running sentinel" in {
+    assert(aggregate(Completed, Running, Paused) == 
WorkflowAggregatedState.RUNNING)
+  }
+
+  it should "return UNINITIALIZED when remaining non-completed states are all 
uninitialized" in {
+    assert(
+      aggregate(Completed, Uninitialized, Uninitialized) ==
+        WorkflowAggregatedState.UNINITIALIZED
+    )
+  }
+
+  it should "return PAUSED when remaining non-completed states are all paused" 
in {
+    assert(aggregate(Completed, Paused, Paused) == 
WorkflowAggregatedState.PAUSED)
+  }
+
+  it should "return RUNNING when remaining non-completed states are all ready" 
in {
+    // Note: an all-ready aggregate maps to RUNNING by current contract.
+    assert(aggregate(Completed, Ready, Ready) == 
WorkflowAggregatedState.RUNNING)
+  }
+
+  it should "return UNKNOWN when remaining non-completed states are mixed" in {
+    assert(aggregate(Completed, Paused, Ready) == 
WorkflowAggregatedState.UNKNOWN)
+  }
+
+  // Anti / boundary cases — make sure unexpected inputs cannot smuggle in a 
wrong
+  // state, and that branch precedence is what the contract claims.
+
+  it should "return UNKNOWN when completed and terminated are mixed (neither 
forall branch matches)" in {
+    // Both `forall(_ == completed)` and `forall(_ == terminated)` fail, no 
running
+    // sentinel is present, and the non-completed remainder is purely 
terminated —
+    // which is none of uninitialized / paused / ready, so the result must be
+    // UNKNOWN rather than COMPLETED.
+    assert(aggregate(Completed, Terminated) == WorkflowAggregatedState.UNKNOWN)
+  }
+
+  it should "give running precedence over completed and terminated" in {
+    assert(aggregate(Completed, Running) == WorkflowAggregatedState.RUNNING)
+    assert(aggregate(Terminated, Running) == WorkflowAggregatedState.RUNNING)
+    assert(aggregate(Running) == WorkflowAggregatedState.RUNNING)
+  }
+
+  it should "report PAUSED / UNINITIALIZED / RUNNING even when no completed 
sentinel is present" in {
+    assert(aggregate(Paused, Paused) == WorkflowAggregatedState.PAUSED)
+    assert(aggregate(Uninitialized, Uninitialized) == 
WorkflowAggregatedState.UNINITIALIZED)
+    // All-ready (no completed) maps to RUNNING, same as the with-completed 
case above.
+    assert(aggregate(Ready, Ready) == WorkflowAggregatedState.RUNNING)
+  }
+
+  it should "fall back to UNKNOWN when input contains values matching none of 
the sentinels" in {
+    // Defensive: a stray label that is not any of the six sentinels must not 
be
+    // silently classified as completed or running.
+    assert(aggregate("not-a-real-state") == WorkflowAggregatedState.UNKNOWN)
+    assert(aggregate(Completed, "not-a-real-state") == 
WorkflowAggregatedState.UNKNOWN)
+  }
+
+  // -- aggregatePortMetrics -----------------------------------------------
+
+  "ExecutionUtils.aggregatePortMetrics" should "return empty when given no 
mappings" in {
+    assert(ExecutionUtils.aggregatePortMetrics(Iterable.empty).isEmpty)
+  }
+
+  it should "preserve a single mapping" in {
+    val mapping = PortTupleMetricsMapping(PortIdentity(0), TupleMetrics(3, 30))
+    assert(ExecutionUtils.aggregatePortMetrics(List(mapping)) == Seq(mapping))
+  }
+
+  it should "sum count and size across mappings on the same port" in {
+    val portId = PortIdentity(0)
+    val a = PortTupleMetricsMapping(portId, TupleMetrics(3, 30))
+    val b = PortTupleMetricsMapping(portId, TupleMetrics(5, 50))
+    val result = ExecutionUtils.aggregatePortMetrics(List(a, b))
+    assert(result == Seq(PortTupleMetricsMapping(portId, TupleMetrics(8, 80))))
+  }
+
+  it should "group mappings by port id when ports differ" in {
+    val a = PortTupleMetricsMapping(PortIdentity(0), TupleMetrics(1, 10))
+    val b = PortTupleMetricsMapping(PortIdentity(1), TupleMetrics(2, 20))
+    val result = ExecutionUtils.aggregatePortMetrics(List(a, b)).toSet
+    assert(result == Set(a, b))
+  }
+
+  it should "sum more than two mappings on the same port without losing any" 
in {
+    val portId = PortIdentity(0)
+    val mappings = List(
+      PortTupleMetricsMapping(portId, TupleMetrics(1, 10)),
+      PortTupleMetricsMapping(portId, TupleMetrics(2, 20)),
+      PortTupleMetricsMapping(portId, TupleMetrics(4, 40))
+    )
+    assert(
+      ExecutionUtils.aggregatePortMetrics(mappings) ==
+        Seq(PortTupleMetricsMapping(portId, TupleMetrics(7, 70)))
+    )
+  }
+
+  it should "sum independently per port when multiple ports each have multiple 
mappings" in {
+    val port0 = PortIdentity(0)
+    val port1 = PortIdentity(1)
+    val mappings = List(
+      PortTupleMetricsMapping(port0, TupleMetrics(1, 10)),
+      PortTupleMetricsMapping(port1, TupleMetrics(3, 30)),
+      PortTupleMetricsMapping(port0, TupleMetrics(2, 20)),
+      PortTupleMetricsMapping(port1, TupleMetrics(4, 40))
+    )
+    val result = ExecutionUtils.aggregatePortMetrics(mappings).toSet
+    assert(
+      result == Set(
+        PortTupleMetricsMapping(port0, TupleMetrics(3, 30)),
+        PortTupleMetricsMapping(port1, TupleMetrics(7, 70))
+      )
+    )
+  }
+
+  it should "preserve a zero-count, zero-size mapping rather than dropping it" 
in {
+    val mapping = PortTupleMetricsMapping(PortIdentity(0), TupleMetrics(0, 0))
+    assert(ExecutionUtils.aggregatePortMetrics(List(mapping)) == Seq(mapping))
+  }
+
+  // -- aggregateMetrics ---------------------------------------------------
+
+  private def metricsWith(
+      state: WorkflowAggregatedState,
+      input: Seq[PortTupleMetricsMapping] = Seq.empty,
+      output: Seq[PortTupleMetricsMapping] = Seq.empty,
+      numWorkers: Int = 0,
+      dataTime: Long = 0,
+      controlTime: Long = 0,
+      idleTime: Long = 0
+  ): OperatorMetrics =
+    OperatorMetrics(
+      state,
+      OperatorStatistics(input, output, numWorkers, dataTime, controlTime, 
idleTime)
+    )
+
+  "ExecutionUtils.aggregateMetrics" should "return UNINITIALIZED defaults when 
given no metrics" in {
+    val result = ExecutionUtils.aggregateMetrics(Iterable.empty)
+    assert(result.operatorState == WorkflowAggregatedState.UNINITIALIZED)
+    assert(result.operatorStatistics.inputMetrics.isEmpty)
+    assert(result.operatorStatistics.outputMetrics.isEmpty)
+    assert(result.operatorStatistics.numWorkers == 0)
+    assert(result.operatorStatistics.dataProcessingTime == 0)
+    assert(result.operatorStatistics.controlProcessingTime == 0)
+    assert(result.operatorStatistics.idleTime == 0)
+  }
+
+  it should "sum scalar statistics and merge per-port metrics across 
operators" in {
+    val portIn = PortIdentity(0)
+    val portOut = PortIdentity(0)
+    val left = metricsWith(
+      WorkflowAggregatedState.RUNNING,
+      input = Seq(PortTupleMetricsMapping(portIn, TupleMetrics(2, 20))),
+      output = Seq(PortTupleMetricsMapping(portOut, TupleMetrics(1, 10))),
+      numWorkers = 1,
+      dataTime = 100,
+      controlTime = 5,
+      idleTime = 1
+    )
+    val right = metricsWith(
+      WorkflowAggregatedState.RUNNING,
+      input = Seq(PortTupleMetricsMapping(portIn, TupleMetrics(3, 30))),
+      output = Seq(PortTupleMetricsMapping(portOut, TupleMetrics(4, 40))),
+      numWorkers = 2,
+      dataTime = 200,
+      controlTime = 10,
+      idleTime = 2
+    )
+
+    val result = ExecutionUtils.aggregateMetrics(List(left, right))
+
+    assert(result.operatorState == WorkflowAggregatedState.RUNNING)
+    assert(
+      result.operatorStatistics.inputMetrics ==
+        Seq(PortTupleMetricsMapping(portIn, TupleMetrics(5, 50)))
+    )
+    assert(
+      result.operatorStatistics.outputMetrics ==
+        Seq(PortTupleMetricsMapping(portOut, TupleMetrics(5, 50)))
+    )
+    assert(result.operatorStatistics.numWorkers == 3)
+    assert(result.operatorStatistics.dataProcessingTime == 300)
+    assert(result.operatorStatistics.controlProcessingTime == 15)
+    assert(result.operatorStatistics.idleTime == 3)
+  }
+
+  it should "filter out internal ports when aggregating port metrics" in {
+    val publicPort = PortIdentity(0)
+    val internalPort = PortIdentity(1, internal = true)
+    val metrics = metricsWith(
+      WorkflowAggregatedState.RUNNING,
+      input = Seq(
+        PortTupleMetricsMapping(publicPort, TupleMetrics(1, 10)),
+        PortTupleMetricsMapping(internalPort, TupleMetrics(99, 990))
+      ),
+      output = Seq(PortTupleMetricsMapping(internalPort, TupleMetrics(7, 70)))
+    )
+
+    val result = ExecutionUtils.aggregateMetrics(List(metrics))
+
+    assert(
+      result.operatorStatistics.inputMetrics ==
+        Seq(PortTupleMetricsMapping(publicPort, TupleMetrics(1, 10)))
+    )
+    assert(result.operatorStatistics.outputMetrics.isEmpty)
+  }
+
+  it should "preserve a single operator's statistics (modulo internal-port 
filtering)" in {
+    val portIn = PortIdentity(0)
+    val portOut = PortIdentity(0)
+    val single = metricsWith(
+      WorkflowAggregatedState.RUNNING,
+      input = Seq(PortTupleMetricsMapping(portIn, TupleMetrics(2, 20))),
+      output = Seq(PortTupleMetricsMapping(portOut, TupleMetrics(3, 30))),
+      numWorkers = 4,
+      dataTime = 50,
+      controlTime = 6,
+      idleTime = 1
+    )
+
+    val result = ExecutionUtils.aggregateMetrics(List(single))
+
+    assert(result.operatorState == WorkflowAggregatedState.RUNNING)
+    assert(
+      result.operatorStatistics.inputMetrics ==
+        Seq(PortTupleMetricsMapping(portIn, TupleMetrics(2, 20)))
+    )
+    assert(
+      result.operatorStatistics.outputMetrics ==
+        Seq(PortTupleMetricsMapping(portOut, TupleMetrics(3, 30)))
+    )
+    assert(result.operatorStatistics.numWorkers == 4)
+    assert(result.operatorStatistics.dataProcessingTime == 50)
+    assert(result.operatorStatistics.controlProcessingTime == 6)
+    assert(result.operatorStatistics.idleTime == 1)
+  }
+
+  it should "report RUNNING when at least one operator is running and the rest 
are completed" in {
+    val running = metricsWith(WorkflowAggregatedState.RUNNING)
+    val completed = metricsWith(WorkflowAggregatedState.COMPLETED)
+
+    val result = ExecutionUtils.aggregateMetrics(List(running, completed))
+
+    assert(result.operatorState == WorkflowAggregatedState.RUNNING)
+  }
+
+  it should "report COMPLETED when every operator is completed" in {
+    val completedA = metricsWith(WorkflowAggregatedState.COMPLETED, numWorkers 
= 1)
+    val completedB = metricsWith(WorkflowAggregatedState.COMPLETED, numWorkers 
= 2)
+
+    val result = ExecutionUtils.aggregateMetrics(List(completedA, completedB))
+
+    assert(result.operatorState == WorkflowAggregatedState.COMPLETED)
+    assert(result.operatorStatistics.numWorkers == 3)
+  }
+
+  it should "tolerate operators with empty per-port stats while summing 
scalars" in {
+    val withStats = metricsWith(
+      WorkflowAggregatedState.RUNNING,
+      input = Seq(PortTupleMetricsMapping(PortIdentity(0), TupleMetrics(1, 
10))),
+      numWorkers = 1,
+      dataTime = 5
+    )
+    val empty = metricsWith(WorkflowAggregatedState.RUNNING, numWorkers = 2, 
dataTime = 7)
+
+    val result = ExecutionUtils.aggregateMetrics(List(withStats, empty))
+
+    assert(result.operatorState == WorkflowAggregatedState.RUNNING)
+    assert(
+      result.operatorStatistics.inputMetrics ==
+        Seq(PortTupleMetricsMapping(PortIdentity(0), TupleMetrics(1, 10)))
+    )
+    assert(result.operatorStatistics.outputMetrics.isEmpty)
+    assert(result.operatorStatistics.numWorkers == 3)
+    assert(result.operatorStatistics.dataProcessingTime == 12)
+  }
+}

Reply via email to