This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 227334f7e080cfbc850885c79202e9fa2b06e5b3 Author: Xiaozhen Liu <[email protected]> AuthorDate: Thu Jan 15 11:27:31 2026 -0800 feat(cache): fix status and stats aggregation for a logical operator with physical operators having mixed statuses. --- .../controller/execution/ExecutionUtils.scala | 33 ++++++++++++++++-- .../scheduling/RegionExecutionCoordinator.scala | 3 +- .../texera/web/service/ExecutionStatsService.scala | 39 ++++++++++++++-------- docs/operator-port-cache.md | 1 + .../workspace/service/joint-ui/joint-ui.service.ts | 4 ++- 5 files changed, 62 insertions(+), 18 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala index 2b028a5137..b827499a06 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala @@ -82,6 +82,21 @@ object ExecutionUtils { ) } + /** + * Aggregates execution states into a workflow-level state. + * + * Priority rules: + * - Empty => UNINITIALIZED + * - All completed or all terminated => COMPLETED + * - All cached (if provided) => COMPLETED_FROM_CACHE + * - Any running => RUNNING + * - Otherwise, strip terminal states (completed/terminated/cached): + * - None left => COMPLETED (mixed terminal states) + * - All uninitialized => UNINITIALIZED + * - All paused => PAUSED + * - All ready => RUNNING + * - Else => UNKNOWN + */ def aggregateStates[T]( states: Iterable[T], completedState: T, @@ -103,7 +118,9 @@ object ExecutionUtils { val terminalStates = Set(completedState, terminatedState) ++ cachedState.toSet val unCompletedStates = states.filterNot(terminalStates.contains) - if (unCompletedStates.forall(_ == uninitializedState)) { + if (unCompletedStates.isEmpty) { + WorkflowAggregatedState.COMPLETED + } else if (unCompletedStates.forall(_ == uninitializedState)) { WorkflowAggregatedState.UNINITIALIZED } else if (unCompletedStates.forall(_ == pausedState)) { WorkflowAggregatedState.PAUSED @@ -115,6 +132,12 @@ object ExecutionUtils { } } + /** + * Aggregates tuple metrics per port, preserving unknown markers for cached inputs. + * + * A negative count/size indicates an unknown value (e.g., cached input ports), + * so any unknown value keeps the aggregated port metrics unknown. + */ def aggregatePortMetrics( metrics: Iterable[PortTupleMetricsMapping] ): Seq[PortTupleMetricsMapping] = { @@ -123,8 +146,12 @@ object ExecutionUtils { .view .map { case (portId, mappings) => - val totalCount = mappings.map(_.tupleMetrics.count).sum - val totalSize = mappings.map(_.tupleMetrics.size).sum + val hasUnknown = + mappings.exists(m => m.tupleMetrics.count < 0 || m.tupleMetrics.size < 0) + val totalCount = + if (hasUnknown) -1L else mappings.map(_.tupleMetrics.count).sum + val totalSize = + if (hasUnknown) -1L else mappings.map(_.tupleMetrics.size).sum PortTupleMetricsMapping(portId, TupleMetrics(totalCount, totalSize)) } .toSeq diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index cc409ea098..e292cddf0b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -146,7 +146,8 @@ class RegionExecutionCoordinator( } .toSeq val inputMetrics = op.inputPorts.keys - .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L))) + // Use -1 to signal skipped/unknown input counts for cached operators. + .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(-1L, -1L))) .toSeq val stats = OperatorMetrics( WorkflowAggregatedState.COMPLETED_FROM_CACHE, diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala index 841c7112c7..59b1853a9d 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala @@ -93,12 +93,15 @@ class ExecutionStatsService( addSubscription( stateStore.statsStore.registerDiffHandler((oldState, newState) => { - // Update operator stats if any operator updates its stat + // Update operator stats if any operator updates its stat. if (newState.operatorInfo.toSet != oldState.operatorInfo.toSet) { Iterable( OperatorStatisticsUpdateEvent(newState.operatorInfo.collect { - case x => - val metrics = x._2 + case (operatorId, metrics) => + val inputCounts = metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.count) + val inputSizes = metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.size) + val outputCounts = metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.count) + val outputSizes = metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.size) val inMap = metrics.operatorStatistics.inputMetrics .map(pm => pm.portId.id.toString -> pm.tupleMetrics.count) .toMap @@ -106,20 +109,19 @@ class ExecutionStatsService( .map(pm => pm.portId.id.toString -> pm.tupleMetrics.count) .toMap - val res = OperatorAggregatedMetrics( + operatorId -> OperatorAggregatedMetrics( Utils.aggregatedStateToString(metrics.operatorState), - metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum, - metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum, + sumNonNegative(inputCounts), + sumNonNegative(inputSizes), inMap, - metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum, - metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum, + sumNonNegative(outputCounts), + sumNonNegative(outputSizes), outMap, metrics.operatorStatistics.numWorkers, metrics.operatorStatistics.dataProcessingTime, metrics.operatorStatistics.controlProcessingTime, metrics.operatorStatistics.idleTime ) - (x._1, res) }) ) } else { @@ -246,15 +248,19 @@ class ExecutionStatsService( try { operatorStatistics.foreach { case (operatorId, stat) => + val inputCounts = stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.count) + val inputSizes = stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.size) + val outputCounts = stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.count) + val outputSizes = stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.size) val runtimeStats = new Tuple( ResultSchema.runtimeStatisticsSchema, Array( operatorId, new java.sql.Timestamp(System.currentTimeMillis()), - stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum, - stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum, - stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum, - stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum, + sumNonNegative(inputCounts), + sumNonNegative(inputSizes), + sumNonNegative(outputCounts), + sumNonNegative(outputSizes), stat.operatorStatistics.dataProcessingTime, stat.operatorStatistics.controlProcessingTime, stat.operatorStatistics.idleTime, @@ -269,6 +275,13 @@ class ExecutionStatsService( } } + /** + * Sums non-negative metric values, treating negative sentinel values as unknown. + */ + private def sumNonNegative(values: Iterable[Long]): Long = { + values.filter(_ >= 0).sum + } + private[this] def registerCallbackOnWorkerAssignedUpdate(): Unit = { addSubscription( client diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index c27a088ac7..bde5981d9e 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -364,6 +364,7 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - Frontend `OperatorState` enum includes `CompletedFromCache` - Operator visualization: blue fill color (`#1890ff`) for cached operators in `joint-ui.service.ts` - Port metrics display: cached input counts show `-`, and cached output ports without materialization show `-` + - Input ports fed by cached sub-operators (e.g., HashJoin build cached, probe executed) report unknown counts and render as `-` - Worker count label: cached operators show `from cache` instead of `#workers` - Region visualization: blue fill (`rgba(24,144,255,0.3)`) for cached regions in `workflow-editor.component.ts` - Region visibility: shared state via `WorkflowActionService.showRegion` ensures correct visibility when regions are created during execution diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts index 69095ff3e0..c174e63958 100644 --- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -366,7 +366,9 @@ export class JointUIService { originalName = portId; } - const labelText = isSkippedFromCache ? "-" : String(count ?? 0); + // Negative counts mark skipped/unknown inputs from cached sub-operators. + const isUnknownCount = count !== undefined && count < 0; + const labelText = isSkippedFromCache || isUnknownCount ? "-" : String(count ?? 0); element.portProp(portId, "attrs/.port-label/text", labelText); } });
