This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 6a9437ed6e refactor(frontend): standardize menu.component subscription
cleanup (#5324)
6a9437ed6e is described below
commit 6a9437ed6e46599044f4f7762dc23e4065624262
Author: Matthew B. <[email protected]>
AuthorDate: Fri Jun 5 17:46:29 2026 -0700
refactor(frontend): standardize menu.component subscription cleanup (#5324)
### What changes were proposed in this PR?
- Removed the redundant `computingUnitStatusSubscription` field (and its
`.add()` wrapper and `ngOnDestroy` unsubscribe) in `menu.component.ts`:
the inner stream is already piped through `untilDestroyed(this)`, so the
manual aggregator did nothing.
- Replaced the manually managed `durationUpdateSubscription` with a
declarative `switchMap` pipe, so each `ExecutionDurationUpdateEvent`
cancels the previous 1s timer and starts a new one only while running,
instead of imperatively unsubscribing and reassigning a field.
- Dropped the now-unused `Subscription` import, leaving the component on
a single consistent cleanup style (`untilDestroyed`).
### Any related issues, documentation, or discussions?
Closes: #5323
### How was this PR tested?
- Ran `ng test
--include='src/app/workspace/component/menu/menu.component.spec.ts'`:
all 42 tests pass.
- Behavior-preserving refactor: same elapsed-timer cadence, same
base-duration updates, same teardown on destroy.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7 in compliance with ASF
---
.../component/menu/menu.component.spec.ts | 150 ++++++++++++++++++++-
.../app/workspace/component/menu/menu.component.ts | 42 +++---
2 files changed, 166 insertions(+), 26 deletions(-)
diff --git a/frontend/src/app/workspace/component/menu/menu.component.spec.ts
b/frontend/src/app/workspace/component/menu/menu.component.spec.ts
index a1865a2628..3825da95a0 100644
--- a/frontend/src/app/workspace/component/menu/menu.component.spec.ts
+++ b/frontend/src/app/workspace/component/menu/menu.component.spec.ts
@@ -22,9 +22,11 @@ import { ComponentFixture, TestBed } from
"@angular/core/testing";
import { HttpClientTestingModule } from "@angular/common/http/testing";
import { RouterTestingModule } from "@angular/router/testing";
import { NzModalService, NzModalModule, NzModalRef } from
"ng-zorro-antd/modal";
-import { BehaviorSubject, of, throwError } from "rxjs";
+import { BehaviorSubject, of, Subject, throwError } from "rxjs";
import { MenuComponent } from "./menu.component";
+import { WorkflowWebsocketService } from
"../../service/workflow-websocket/workflow-websocket.service";
+import type { ExecutionDurationUpdateEvent } from
"../../types/workflow-websocket.interface";
import { OperatorMetadataService } from
"../../service/operator-metadata/operator-metadata.service";
import { StubOperatorMetadataService } from
"../../service/operator-metadata/stub-operator-metadata.service";
import { ComputingUnitStatusService } from
"../../../common/service/computing-unit/computing-unit-status/computing-unit-status.service";
@@ -636,4 +638,150 @@ describe("MenuComponent", () => {
});
});
});
+
+ // Regression coverage for #5323: the elapsed-time timer was refactored from
a
+ // manually managed `durationUpdateSubscription` into a declarative
`switchMap`
+ // pipe terminated by `untilDestroyed`. These tests pin the resulting
behavior
+ // (base-duration updates, 1s cadence, restart-on-event, stop-when-idle) and,
+ // crucially, that the timer is torn down with the component so it cannot
keep
+ // firing or leak after destroy.
+ describe("execution duration timer", () => {
+ let durationEvents$: Subject<{ type: "ExecutionDurationUpdateEvent" } &
ExecutionDurationUpdateEvent>;
+ let timerFixture: ComponentFixture<MenuComponent>;
+ let timerComponent: MenuComponent;
+
+ function emitDuration(duration: number, isRunning: boolean): void {
+ durationEvents$.next({ type: "ExecutionDurationUpdateEvent", duration,
isRunning });
+ }
+
+ beforeEach(() => {
+ vi.useFakeTimers();
+ durationEvents$ = new Subject();
+ const websocket = TestBed.inject(WorkflowWebsocketService);
+ const original = websocket.subscribeToEvent.bind(websocket);
+ // Only intercept the duration event; defer every other event type to the
+ // real implementation so unrelated subscriptions keep working.
+ vi.spyOn(websocket, "subscribeToEvent").mockImplementation((type: any) =>
+ type === "ExecutionDurationUpdateEvent" ?
(durationEvents$.asObservable() as any) : original(type)
+ );
+
+ timerFixture = TestBed.createComponent(MenuComponent);
+ timerComponent = timerFixture.componentInstance;
+ timerFixture.detectChanges();
+ });
+
+ afterEach(() => {
+ vi.useRealTimers();
+ });
+
+ it("sets executionDuration to the event's base duration on each event", ()
=> {
+ emitDuration(5000, false);
+ expect(timerComponent.executionDuration).toBe(5000);
+
+ emitDuration(8000, false);
+ expect(timerComponent.executionDuration).toBe(8000);
+ });
+
+ it("advances the duration by 1s every second while running", () => {
+ emitDuration(0, true);
+ expect(timerComponent.executionDuration).toBe(0);
+
+ vi.advanceTimersByTime(1000);
+ expect(timerComponent.executionDuration).toBe(1000);
+
+ vi.advanceTimersByTime(2000);
+ expect(timerComponent.executionDuration).toBe(3000);
+ });
+
+ it("does not start a timer when the execution is not running", () => {
+ emitDuration(7000, false);
+
+ vi.advanceTimersByTime(5000);
+
+ expect(timerComponent.executionDuration).toBe(7000);
+ });
+
+ it("restarts the 1s timer on each new running event, cancelling the
previous one", () => {
+ emitDuration(0, true);
+ vi.advanceTimersByTime(1000);
+ expect(timerComponent.executionDuration).toBe(1000);
+
+ // A new event resets the base duration and restarts the cadence; the
+ // previous timer must be cancelled (switchMap) so it cannot
double-count.
+ emitDuration(10000, true);
+ expect(timerComponent.executionDuration).toBe(10000);
+
+ vi.advanceTimersByTime(500);
+ expect(timerComponent.executionDuration).toBe(10000);
+
+ vi.advanceTimersByTime(500);
+ expect(timerComponent.executionDuration).toBe(11000);
+ });
+
+ it("stops the timer when a running execution transitions to not running",
() => {
+ emitDuration(0, true);
+ vi.advanceTimersByTime(1000);
+ expect(timerComponent.executionDuration).toBe(1000);
+
+ emitDuration(2000, false);
+ vi.advanceTimersByTime(5000);
+ expect(timerComponent.executionDuration).toBe(2000);
+ });
+
+ it("tears down the timer on destroy so the duration stops advancing", ()
=> {
+ emitDuration(0, true);
+ vi.advanceTimersByTime(1000);
+ expect(timerComponent.executionDuration).toBe(1000);
+
+ timerFixture.destroy();
+
+ // The previously running timer must not keep firing after destroy...
+ vi.advanceTimersByTime(5000);
+ expect(timerComponent.executionDuration).toBe(1000);
+
+ // ...nor should late events revive it (the source subscription is
closed).
+ emitDuration(9999, true);
+ vi.advanceTimersByTime(5000);
+ expect(timerComponent.executionDuration).toBe(1000);
+ });
+ });
+
+ // Regression coverage for #5323: the computing-unit status subscription lost
+ // its manual `computingUnitStatusSubscription` aggregator and its
+ // `ngOnDestroy` unsubscribe, relying on `untilDestroyed` instead. These
tests
+ // pin both that status updates still propagate and that they stop on
destroy.
+ describe("computing unit status subscription", () => {
+ let status$: Subject<ComputingUnitState>;
+ let cuFixture: ComponentFixture<MenuComponent>;
+ let cuComponent: MenuComponent;
+
+ beforeEach(() => {
+ status$ = new Subject<ComputingUnitState>();
+ const cuService = TestBed.inject(ComputingUnitStatusService);
+ vi.spyOn(cuService, "getStatus").mockReturnValue(status$.asObservable());
+
+ cuFixture = TestBed.createComponent(MenuComponent);
+ cuComponent = cuFixture.componentInstance;
+ cuFixture.detectChanges();
+ });
+
+ it("updates computingUnitStatus and re-applies the run button behavior on
each status emission", () => {
+ const applySpy = vi.spyOn(cuComponent, "applyRunButtonBehavior");
+
+ status$.next(ComputingUnitState.Running);
+
+ expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running);
+ expect(applySpy).toHaveBeenCalledTimes(1);
+ });
+
+ it("stops updating computingUnitStatus once the component is destroyed",
() => {
+ status$.next(ComputingUnitState.Running);
+ expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running);
+
+ cuFixture.destroy();
+
+ status$.next(ComputingUnitState.NoComputingUnit);
+ expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running);
+ });
+ });
});
diff --git a/frontend/src/app/workspace/component/menu/menu.component.ts
b/frontend/src/app/workspace/component/menu/menu.component.ts
index 69f08e4fef..aa9b3a0727 100644
--- a/frontend/src/app/workspace/component/menu/menu.component.ts
+++ b/frontend/src/app/workspace/component/menu/menu.component.ts
@@ -33,7 +33,7 @@ import { WorkflowActionService } from
"../../service/workflow-graph/model/workfl
import { ExecutionState } from "../../types/execute-workflow.interface";
import { WorkflowWebsocketService } from
"../../service/workflow-websocket/workflow-websocket.service";
import { WorkflowResultExportService } from
"../../service/workflow-result-export/workflow-result-export.service";
-import { catchError, debounceTime, filter, mergeMap, tap } from
"rxjs/operators";
+import { catchError, debounceTime, filter, mergeMap, switchMap, tap } from
"rxjs/operators";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
import { WorkflowUtilService } from
"../../service/workflow-graph/util/workflow-util.service";
import { WorkflowVersionService } from
"../../../dashboard/service/user/workflow-version/workflow-version.service";
@@ -43,7 +43,7 @@ import { saveAs } from "file-saver";
import { NotificationService } from
"src/app/common/service/notification/notification.service";
import { OperatorMenuService } from
"../../service/operator-menu/operator-menu.service";
import { CoeditorPresenceService } from
"../../service/workflow-graph/model/coeditor-presence.service";
-import { firstValueFrom, of, Subscription, timer } from "rxjs";
+import { EMPTY, firstValueFrom, of, timer } from "rxjs";
import { isDefined } from "../../../common/util/predicate";
import { NzModalService } from "ng-zorro-antd/modal";
import { ResultExportationComponent } from
"../result-exportation/result-exportation.component";
@@ -154,14 +154,12 @@ export class MenuComponent implements OnInit, OnDestroy {
public runDisable = false;
public executionDuration = 0;
- private durationUpdateSubscription: Subscription = new Subscription();
// flag to display a particular version in the current canvas
public displayParticularWorkflowVersion: boolean = false;
public onClickRunHandler: () => void;
// Computing unit status variables
- private computingUnitStatusSubscription: Subscription = new Subscription();
public selectedComputingUnit: DashboardWorkflowComputingUnit | null = null;
public computingUnitStatus: ComputingUnitState =
ComputingUnitState.NoComputingUnit;
@@ -193,17 +191,14 @@ export class MenuComponent implements OnInit, OnDestroy {
) {
workflowWebsocketService
.subscribeToEvent("ExecutionDurationUpdateEvent")
- .pipe(untilDestroyed(this))
- .subscribe(event => {
- this.executionDuration = event.duration;
- this.durationUpdateSubscription.unsubscribe();
- if (event.isRunning) {
- this.durationUpdateSubscription = timer(1000, 1000)
- .pipe(untilDestroyed(this))
- .subscribe(() => {
- this.executionDuration += 1000;
- });
- }
+ .pipe(
+ tap(event => (this.executionDuration = event.duration)),
+ // restart the 1s timer on each event, only while running
+ switchMap(event => (event.isRunning ? timer(1000, 1000) : EMPTY)),
+ untilDestroyed(this)
+ )
+ .subscribe(() => {
+ this.executionDuration += 1000;
});
this.executionState = executeWorkflowService.getExecutionState().state;
// return the run button after the execution is finished, either
@@ -254,7 +249,6 @@ export class MenuComponent implements OnInit, OnDestroy {
ngOnDestroy(): void {
this.workflowResultExportService.resetFlags();
- this.computingUnitStatusSubscription.unsubscribe();
}
private subscribeToComputingUnitSelection(): void {
@@ -271,15 +265,13 @@ export class MenuComponent implements OnInit, OnDestroy {
*/
private subscribeToComputingUnitStatus(): void {
// Subscribe to get the computing unit status
- this.computingUnitStatusSubscription.add(
- this.computingUnitStatusService
- .getStatus()
- .pipe(untilDestroyed(this))
- .subscribe(status => {
- this.computingUnitStatus = status;
- this.applyRunButtonBehavior(this.getRunButtonBehavior());
- })
- );
+ this.computingUnitStatusService
+ .getStatus()
+ .pipe(untilDestroyed(this))
+ .subscribe(status => {
+ this.computingUnitStatus = status;
+ this.applyRunButtonBehavior(this.getRunButtonBehavior());
+ });
}
/**