This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 227334f7e080cfbc850885c79202e9fa2b06e5b3
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Thu Jan 15 11:27:31 2026 -0800

    feat(cache): fix status and stats aggregation for a logical operator with 
physical operators having mixed statuses.
---
 .../controller/execution/ExecutionUtils.scala      | 33 ++++++++++++++++--
 .../scheduling/RegionExecutionCoordinator.scala    |  3 +-
 .../texera/web/service/ExecutionStatsService.scala | 39 ++++++++++++++--------
 docs/operator-port-cache.md                        |  1 +
 .../workspace/service/joint-ui/joint-ui.service.ts |  4 ++-
 5 files changed, 62 insertions(+), 18 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala
index 2b028a5137..b827499a06 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala
@@ -82,6 +82,21 @@ object ExecutionUtils {
     )
   }
 
+  /**
+    * Aggregates execution states into a workflow-level state.
+    *
+    * Priority rules:
+    * - Empty => UNINITIALIZED
+    * - All completed or all terminated => COMPLETED
+    * - All cached (if provided) => COMPLETED_FROM_CACHE
+    * - Any running => RUNNING
+    * - Otherwise, strip terminal states (completed/terminated/cached):
+    *   - None left => COMPLETED (mixed terminal states)
+    *   - All uninitialized => UNINITIALIZED
+    *   - All paused => PAUSED
+    *   - All ready => RUNNING
+    *   - Else => UNKNOWN
+    */
   def aggregateStates[T](
       states: Iterable[T],
       completedState: T,
@@ -103,7 +118,9 @@ object ExecutionUtils {
         val terminalStates =
           Set(completedState, terminatedState) ++ cachedState.toSet
         val unCompletedStates = states.filterNot(terminalStates.contains)
-        if (unCompletedStates.forall(_ == uninitializedState)) {
+        if (unCompletedStates.isEmpty) {
+          WorkflowAggregatedState.COMPLETED
+        } else if (unCompletedStates.forall(_ == uninitializedState)) {
           WorkflowAggregatedState.UNINITIALIZED
         } else if (unCompletedStates.forall(_ == pausedState)) {
           WorkflowAggregatedState.PAUSED
@@ -115,6 +132,12 @@ object ExecutionUtils {
     }
   }
 
+  /**
+    * Aggregates tuple metrics per port, preserving unknown markers for cached 
inputs.
+    *
+    * A negative count/size indicates an unknown value (e.g., cached input 
ports),
+    * so any unknown value keeps the aggregated port metrics unknown.
+    */
   def aggregatePortMetrics(
       metrics: Iterable[PortTupleMetricsMapping]
   ): Seq[PortTupleMetricsMapping] = {
@@ -123,8 +146,12 @@ object ExecutionUtils {
       .view
       .map {
         case (portId, mappings) =>
-          val totalCount = mappings.map(_.tupleMetrics.count).sum
-          val totalSize = mappings.map(_.tupleMetrics.size).sum
+          val hasUnknown =
+            mappings.exists(m => m.tupleMetrics.count < 0 || 
m.tupleMetrics.size < 0)
+          val totalCount =
+            if (hasUnknown) -1L else mappings.map(_.tupleMetrics.count).sum
+          val totalSize =
+            if (hasUnknown) -1L else mappings.map(_.tupleMetrics.size).sum
           PortTupleMetricsMapping(portId, TupleMetrics(totalCount, totalSize))
       }
       .toSeq
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index cc409ea098..e292cddf0b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -146,7 +146,8 @@ class RegionExecutionCoordinator(
         }
         .toSeq
       val inputMetrics = op.inputPorts.keys
-        .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L)))
+        // Use -1 to signal skipped/unknown input counts for cached operators.
+        .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(-1L, -1L)))
         .toSeq
       val stats = OperatorMetrics(
         WorkflowAggregatedState.COMPLETED_FROM_CACHE,
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
index 841c7112c7..59b1853a9d 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
@@ -93,12 +93,15 @@ class ExecutionStatsService(
 
   addSubscription(
     stateStore.statsStore.registerDiffHandler((oldState, newState) => {
-      // Update operator stats if any operator updates its stat
+      // Update operator stats if any operator updates its stat.
       if (newState.operatorInfo.toSet != oldState.operatorInfo.toSet) {
         Iterable(
           OperatorStatisticsUpdateEvent(newState.operatorInfo.collect {
-            case x =>
-              val metrics = x._2
+            case (operatorId, metrics) =>
+              val inputCounts = 
metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.count)
+              val inputSizes = 
metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.size)
+              val outputCounts = 
metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.count)
+              val outputSizes = 
metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.size)
               val inMap = metrics.operatorStatistics.inputMetrics
                 .map(pm => pm.portId.id.toString -> pm.tupleMetrics.count)
                 .toMap
@@ -106,20 +109,19 @@ class ExecutionStatsService(
                 .map(pm => pm.portId.id.toString -> pm.tupleMetrics.count)
                 .toMap
 
-              val res = OperatorAggregatedMetrics(
+              operatorId -> OperatorAggregatedMetrics(
                 Utils.aggregatedStateToString(metrics.operatorState),
-                
metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum,
-                
metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum,
+                sumNonNegative(inputCounts),
+                sumNonNegative(inputSizes),
                 inMap,
-                
metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum,
-                
metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum,
+                sumNonNegative(outputCounts),
+                sumNonNegative(outputSizes),
                 outMap,
                 metrics.operatorStatistics.numWorkers,
                 metrics.operatorStatistics.dataProcessingTime,
                 metrics.operatorStatistics.controlProcessingTime,
                 metrics.operatorStatistics.idleTime
               )
-              (x._1, res)
           })
         )
       } else {
@@ -246,15 +248,19 @@ class ExecutionStatsService(
     try {
       operatorStatistics.foreach {
         case (operatorId, stat) =>
+          val inputCounts = 
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.count)
+          val inputSizes = 
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.size)
+          val outputCounts = 
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.count)
+          val outputSizes = 
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.size)
           val runtimeStats = new Tuple(
             ResultSchema.runtimeStatisticsSchema,
             Array(
               operatorId,
               new java.sql.Timestamp(System.currentTimeMillis()),
-              
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum,
-              
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum,
-              
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum,
-              
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum,
+              sumNonNegative(inputCounts),
+              sumNonNegative(inputSizes),
+              sumNonNegative(outputCounts),
+              sumNonNegative(outputSizes),
               stat.operatorStatistics.dataProcessingTime,
               stat.operatorStatistics.controlProcessingTime,
               stat.operatorStatistics.idleTime,
@@ -269,6 +275,13 @@ class ExecutionStatsService(
     }
   }
 
+  /**
+    * Sums non-negative metric values, treating negative sentinel values as 
unknown.
+    */
+  private def sumNonNegative(values: Iterable[Long]): Long = {
+    values.filter(_ >= 0).sum
+  }
+
   private[this] def registerCallbackOnWorkerAssignedUpdate(): Unit = {
     addSubscription(
       client
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index c27a088ac7..bde5981d9e 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -364,6 +364,7 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
   - Frontend `OperatorState` enum includes `CompletedFromCache`
   - Operator visualization: blue fill color (`#1890ff`) for cached operators 
in `joint-ui.service.ts`
   - Port metrics display: cached input counts show `-`, and cached output 
ports without materialization show `-`
+  - Input ports fed by cached sub-operators (e.g., HashJoin build cached, 
probe executed) report unknown counts and render as `-`
   - Worker count label: cached operators show `from cache` instead of 
`#workers`
   - Region visualization: blue fill (`rgba(24,144,255,0.3)`) for cached 
regions in `workflow-editor.component.ts`
   - Region visibility: shared state via `WorkflowActionService.showRegion` 
ensures correct visibility when regions are created during execution
diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts 
b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
index 69095ff3e0..c174e63958 100644
--- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
+++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
@@ -366,7 +366,9 @@ export class JointUIService {
           originalName = portId;
         }
 
-        const labelText = isSkippedFromCache ? "-" : String(count ?? 0);
+        // Negative counts mark skipped/unknown inputs from cached 
sub-operators.
+        const isUnknownCount = count !== undefined && count < 0;
+        const labelText = isSkippedFromCache || isUnknownCount ? "-" : 
String(count ?? 0);
         element.portProp(portId, "attrs/.port-label/text", labelText);
       }
     });

Reply via email to