This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit c3fd3b1b535e94e68034b7d113e9c0b1e24a1738 Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Jun 1 21:31:17 2026 -0700 fix(amber): fix incorrectly resolved merge conflicts. --- .../controller/CacheReusePreSchedulingStep.scala | 23 +++++- .../WorkerExecutionCompletedHandler.scala | 5 +- .../scheduling/RegionExecutionCoordinator.scala | 5 +- .../scheduling/WorkflowExecutionCoordinator.scala | 87 ++++++++++------------ .../cache-panel/cache-panel.component.ts | 20 +++++ .../context-menu/context-menu.component.html | 8 +- .../workflow-editor/workflow-editor.component.ts | 21 ++---- .../workspace/service/joint-ui/joint-ui.service.ts | 38 ---------- 8 files changed, 93 insertions(+), 114 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala index 122eadf2da..14cafded9d 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala @@ -19,6 +19,7 @@ package org.apache.texera.amber.engine.architecture.controller +import org.apache.texera.amber.core.storage.VFSURIFactory import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity import org.apache.texera.amber.core.workflow.{ CachedOutput, @@ -199,6 +200,22 @@ object CacheReusePreSchedulingStep { linksPrunedPlan.getSubPlan(executeOperators) } + /** + * Cache entries persist the `/result` leaf URI, but scheduling port configs and the input-port + * materialization readers expect the port *base* URI: they derive `resultURI`/`stateURI` from it + * via `VFSURIFactory`. Reconstruct the base from a cached `/result` URI so reuse-only ports line + * up with the (post state-materialization) base/result/state storage layout. + */ + private def portBaseURIOf(cachedResultUri: URI): URI = { + val (workflowId, executionId, globalPortIdOpt, _) = VFSURIFactory.decodeURI(cachedResultUri) + val globalPortId = globalPortIdOpt.getOrElse( + throw new IllegalArgumentException( + s"Cached result URI is missing a globalPortId: $cachedResultUri" + ) + ) + VFSURIFactory.createPortBaseURI(workflowId, executionId, globalPortId) + } + /** * Prepare generic planning hints for CostBasedScheduleGenerator. */ @@ -214,7 +231,7 @@ object CacheReusePreSchedulingStep { .flatMap { pid => cachedOutputsByPort.get(pid).map { cached => pid -> OutputPortConfig( - storageURI = cached.resultUri, + storageURIBase = portBaseURIOf(cached.resultUri), cachedTupleCount = cached.tupleCount, materialize = false ) @@ -230,7 +247,7 @@ object CacheReusePreSchedulingStep { val outputPort = GlobalPortIdentity(link.fromOpId, link.fromPortId) cachedOutputsByPort.get(outputPort) match { case Some(cached) => - val uris = acc.getOrElse(inputPort, List.empty) :+ cached.resultUri + val uris = acc.getOrElse(inputPort, List.empty) :+ portBaseURIOf(cached.resultUri) acc.updated(inputPort, uris) case None => acc @@ -265,7 +282,7 @@ object CacheReusePreSchedulingStep { .flatMap { outputPort => cachedOutputsByPort.get(outputPort).map { cached => outputPort -> OutputPortConfig( - storageURI = cached.resultUri, + storageURIBase = portBaseURIOf(cached.resultUri), cachedTupleCount = cached.tupleCount, materialize = false ) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala index 01d18628d3..52155a6154 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala @@ -20,10 +20,7 @@ package org.apache.texera.amber.engine.architecture.controller.promisehandlers import com.twitter.util.Future -import org.apache.texera.amber.engine.architecture.controller.{ - ControllerAsyncRPCHandlerInitializer, - ExecutionStateUpdate -} +import org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ AsyncRPCContext, EmptyRequest, diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index aa1ff1fd43..d48c80cd41 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -38,7 +38,6 @@ import org.apache.texera.amber.engine.architecture.controller.execution.{ } import org.apache.texera.amber.engine.architecture.controller.{ ControllerConfig, - ExecutionStateUpdate, ExecutionStatsUpdate, RuntimeStatisticsPersist, WorkerAssignmentUpdate @@ -176,7 +175,7 @@ class RegionExecutionCoordinator( private def recordCachedOutputPortResults(resourceConfig: ResourceConfig): Unit = { resourceConfig.portConfigs.collect { case (gpid, cfg: OutputPortConfig) => - val storageUri = cfg.storageURI + val storageUri = VFSURIFactory.resultURI(cfg.storageURIBase) WorkflowExecutionsResource.insertOperatorPortResultUri( eid = executionId, globalPortId = gpid, @@ -598,7 +597,7 @@ class RegionExecutionCoordinator( WorkflowExecutionsResource.insertOperatorPortResultUri( eid = executionId, globalPortId = outputPortId, - uri = outputCfg.storageURI + uri = VFSURIFactory.resultURI(outputCfg.storageURIBase) ) case _ => } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 161a116960..212a137918 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -26,8 +26,6 @@ import org.apache.texera.amber.engine.architecture.common.{ PekkoActorRefMappingService, PekkoActorService } -import org.apache.texera.amber.engine.architecture.controller.ControllerConfig -import org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate import org.apache.texera.amber.engine.architecture.controller.{ ControllerConfig, ExecutionStateUpdate @@ -89,54 +87,51 @@ class WorkflowExecutionCoordinator( } // All existing regions are completed. Start the next region (if any). - val nextRegions = getNextRegions() + val nextRegions = if (!schedule.hasNext) Set.empty[Region] else schedule.next() if (nextRegions.isEmpty) { - if (regionExecutionCoordinators.values.forall(_.isCompleted)) { - asyncRPCClient.sendToClient( - ExecutionStateUpdate(workflowExecution.getState) - ) + if (workflowExecution.isCompleted && completionNotified.compareAndSet(false, true)) { + asyncRPCClient.sendToClient(ExecutionStateUpdate(workflowExecution.getState)) } - Future.Unit - } else { - executedRegions.append(nextRegions) - val launches = nextRegions - .map(region => { - workflowExecution.initRegionExecution(region) - regionExecutionCoordinators(region.id) = new RegionExecutionCoordinator( - region, - workflowExecution, - executionId, - asyncRPCClient, - controllerConfig, - actorService, - actorRefService - ) - regionExecutionCoordinators(region.id) - }) - .map(_.syncStatusAndTransitionRegionExecutionPhase()) - .toSeq - Future - .collect(launches) - .unit - .flatMap { _ => - if (regionExecutionCoordinators.values.exists(!_.isCompleted)) { - Future.Unit - } else { - // All launched regions finished immediately (e.g., cached); proceed to next batch. - coordinateRegionExecutors(actorService) - } - } - .map { _ => - if ( - regionExecutionCoordinators.values.forall(_.isCompleted) && - workflowExecution.isCompleted - ) { - asyncRPCClient.sendToClient( - ExecutionStateUpdate(workflowExecution.getState) + return Future.Unit + } + + executedRegions.append(nextRegions) + Future + .collect( + nextRegions + .map(region => { + val isRestart = workflowExecution.hasRegionExecution(region.id) + if (isRestart) { + workflowExecution.restartRegionExecution(region) + } else { + workflowExecution.initRegionExecution(region) + } + regionExecutionCoordinators(region.id) = new RegionExecutionCoordinator( + region, + isRestart, + workflowExecution, + executionId, + asyncRPCClient, + controllerConfig, + actorService, + actorRefService ) - } + regionExecutionCoordinators(region.id) + }) + .map(_.syncStatusAndTransitionRegionExecutionPhase()) + .toSeq + ) + .unit + .flatMap { _ => + if (regionExecutionCoordinators.values.exists(!_.isCompleted)) { + Future.Unit + } else { + // All launched regions finished immediately (e.g., cached); proceed to the next batch. + // Cached regions have no workers, so no port-completion event would otherwise re-trigger + // coordination, and completion is emitted once via the guard above when the schedule drains. + coordinateRegionExecutors(actorService) } - } + } } def getRegionOfLink(link: PhysicalLink): Region = { diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts index 5e9e0b9d18..ae738e1e99 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts @@ -19,6 +19,15 @@ import { Component, OnInit } from "@angular/core"; import { ActivatedRoute } from "@angular/router"; +import { CommonModule } from "@angular/common"; +import { FormsModule } from "@angular/forms"; +import { NzButtonModule } from "ng-zorro-antd/button"; +import { NzPopconfirmModule } from "ng-zorro-antd/popconfirm"; +import { NzSwitchModule } from "ng-zorro-antd/switch"; +import { NzAlertModule } from "ng-zorro-antd/alert"; +import { NzTableModule } from "ng-zorro-antd/table"; +import { NzTagModule } from "ng-zorro-antd/tag"; +import { NzEmptyModule } from "ng-zorro-antd/empty"; import { finalize, tap } from "rxjs/operators"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { WorkflowCacheEntry } from "../../../../dashboard/type/workflow-cache-entry"; @@ -39,6 +48,17 @@ import { selector: "texera-cache-panel", templateUrl: "cache-panel.component.html", styleUrls: ["cache-panel.component.scss"], + imports: [ + CommonModule, + FormsModule, + NzButtonModule, + NzPopconfirmModule, + NzSwitchModule, + NzAlertModule, + NzTableModule, + NzTagModule, + NzEmptyModule, + ], }) export class CachePanelComponent implements OnInit { public cacheEntries: WorkflowCacheEntry[] = []; diff --git a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html index 32c5c9a9b4..9d963aaf42 100644 --- a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html +++ b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html @@ -125,8 +125,8 @@ </li> <li nz-menu-item - *ngIf="operatorMenuService.highlightedOperators.value.length === 1 && - operatorMenuService.highlightedCommentBoxes.value.length === 0 && + *ngIf="highlightedOperatorIds.length === 1 && + highlightedCommentBoxIds.length === 0 && !hasHighlightedLinks() && isWorkflowModifiable" (click)="clearCacheForSelectedOperator()"> @@ -138,8 +138,8 @@ </li> <li nz-menu-item - *ngIf="highlightedOperators.value.length === 1 && - highlightedCommentBoxes.value.length === 0 && + *ngIf="highlightedOperatorIds.length === 1 && + highlightedCommentBoxIds.length === 0 && !hasHighlightedLinks() && isWorkflowModifiable" (click)="clearCacheUpToSelectedOperator()"> diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index 39601d9347..bf4a1180ab 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -31,7 +31,7 @@ import { CacheUsageService } from "../../service/workflow-status/cache-usage.ser import { WorkflowCacheEntriesService } from "../../service/workflow-status/workflow-cache-entries.service"; import { WorkflowStatusService } from "../../service/workflow-status/workflow-status.service"; import { ExecutionState, OperatorState } from "../../types/execute-workflow.interface"; -import { LogicalPort, OperatorLink, OperatorPredicate } from "../../types/workflow-common.interface"; +import { LogicalPort, OperatorLink } from "../../types/workflow-common.interface"; import { WorkflowCacheEntry } from "../../../dashboard/type/workflow-cache-entry"; import { auditTime, filter, map, takeUntil, withLatestFrom } from "rxjs/operators"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; @@ -480,8 +480,6 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy ); let regionMap: { regionElement: joint.dia.Element; operators: joint.dia.Cell[] }[] = []; - // Track whether regions should be visible (preserved across element recreation) - let regionsVisible = false; const colorMap: Record<string, string> = { ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)", ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)", @@ -494,25 +492,16 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy .getRegionUpdateStream() .pipe(untilDestroyed(this)) .subscribe(event => { - // Preserve visibility state from existing elements before removing them - const existingRegions = this.paper.model.getCells().filter(element => element instanceof Region); - if (existingRegions.length > 0) { - regionsVisible = existingRegions[0].attr("body/visibility") === "visible"; - } else { - // No existing regions - use the shared state from workflowActionService - regionsVisible = this.workflowActionService.getShowRegion(); - } - existingRegions.forEach(element => element.remove()); + this.paper.model + .getCells() + .filter(element => element instanceof Region) + .forEach(element => element.remove()); regionMap = event.regions.map(([id, region]) => { const element = new Region({ id: "region-" + id }); const ops = region.map(id => this.paper.getModelById(id)); this.paper.model.addCell(element); this.updateRegionElement(element, ops); - // Apply visibility state - if (regionsVisible) { - element.attr("body/visibility", "visible"); - } return { regionElement: element, operators: ops }; }); // regions are recreated on every update, so reapply the current toggle state to the new elements diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts index 0320e971fa..2d03f71aab 100644 --- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -423,9 +423,6 @@ export class JointUIService { if (portId != null) { const parts = portId.split("-"); const numericSuffix = parts.length > 1 ? parts[1] : portId; - const count: number = inputMetrics[numericSuffix] ?? 0; - element.portProp(portId, "attrs/.port-label/text", count.toLocaleString()); - const count = inputMetrics[numericSuffix]; const rawAttrs = (portDef.attrs as any) || {}; const oldText: string = (rawAttrs[".port-label"] && rawAttrs[".port-label"].text) || ""; @@ -448,7 +445,6 @@ export class JointUIService { const parts = portId.split("-"); const numericSuffix = parts.length > 1 ? parts[1] : portId; const count: number = outputMetrics[numericSuffix] ?? 0; - element.portProp(portId, "attrs/.port-label/text", count.toLocaleString()); const rawAttrs = (portDef.attrs as any) || {}; const oldText: string = (rawAttrs[".port-label"] && rawAttrs[".port-label"].text) || ""; let originalName = oldText.includes(":") ? oldText.split(":", 1)[0].trim() : oldText; @@ -842,40 +838,6 @@ export class JointUIService { ]; } - /** - * This function create a custom svg style for the operator - * @returns the custom attributes of the tooltip. - */ - public static getCustomOperatorStatusTooltipStyleAttrs(): joint.shapes.devs.ModelSelectors { - return { - "element-node": { - style: { "pointer-events": "none" }, - }, - polygon: { - fill: "#FFFFFF", - "follow-scale": true, - stroke: "purple", - "stroke-width": "2", - rx: "5px", - ry: "5px", - refPoints: "0,30 150,30 150,120 85,120 75,150 65,120 0,120", - display: "none", - style: { "pointer-events": "none" }, - }, - "#operatorCount": { - fill: "#595959", - "font-size": "12px", - ref: "polygon", - "y-alignment": "middle", - "x-alignment": "left", - "ref-x": 0.05, - "ref-y": 0.2, - display: "none", - style: { "pointer-events": "none" }, - }, - }; - } - /** * This function creates a custom svg style for the operator. * This function also makes the delete button defined above to emit the delete event that will
