This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5324-76f116f5e0f7b29c2e6b18cdf1688e448d2ddee3
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 6a9437ed6e46599044f4f7762dc23e4065624262
Author: Matthew B. <[email protected]>
AuthorDate: Fri Jun 5 17:46:29 2026 -0700

    refactor(frontend): standardize menu.component subscription cleanup (#5324)
    
    ### What changes were proposed in this PR?
    - Removed the redundant `computingUnitStatusSubscription` field (and its
    `.add()` wrapper and `ngOnDestroy` unsubscribe) in `menu.component.ts`:
    the inner stream is already piped through `untilDestroyed(this)`, so the
    manual aggregator did nothing.
    - Replaced the manually managed `durationUpdateSubscription` with a
    declarative `switchMap` pipe, so each `ExecutionDurationUpdateEvent`
    cancels the previous 1s timer and starts a new one only while running,
    instead of imperatively unsubscribing and reassigning a field.
    - Dropped the now-unused `Subscription` import, leaving the component on
    a single consistent cleanup style (`untilDestroyed`).
    ### Any related issues, documentation, or discussions?
    Closes: #5323
    ### How was this PR tested?
    - Ran `ng test
    --include='src/app/workspace/component/menu/menu.component.spec.ts'`:
    all 42 tests pass.
    - Behavior-preserving refactor: same elapsed-timer cadence, same
    base-duration updates, same teardown on destroy.
    ### Was this PR authored or co-authored using generative AI tooling?
    Co-authored with Claude Opus 4.7 in compliance with ASF
---
 .../component/menu/menu.component.spec.ts          | 150 ++++++++++++++++++++-
 .../app/workspace/component/menu/menu.component.ts |  42 +++---
 2 files changed, 166 insertions(+), 26 deletions(-)

diff --git a/frontend/src/app/workspace/component/menu/menu.component.spec.ts 
b/frontend/src/app/workspace/component/menu/menu.component.spec.ts
index a1865a2628..3825da95a0 100644
--- a/frontend/src/app/workspace/component/menu/menu.component.spec.ts
+++ b/frontend/src/app/workspace/component/menu/menu.component.spec.ts
@@ -22,9 +22,11 @@ import { ComponentFixture, TestBed } from 
"@angular/core/testing";
 import { HttpClientTestingModule } from "@angular/common/http/testing";
 import { RouterTestingModule } from "@angular/router/testing";
 import { NzModalService, NzModalModule, NzModalRef } from 
"ng-zorro-antd/modal";
-import { BehaviorSubject, of, throwError } from "rxjs";
+import { BehaviorSubject, of, Subject, throwError } from "rxjs";
 
 import { MenuComponent } from "./menu.component";
+import { WorkflowWebsocketService } from 
"../../service/workflow-websocket/workflow-websocket.service";
+import type { ExecutionDurationUpdateEvent } from 
"../../types/workflow-websocket.interface";
 import { OperatorMetadataService } from 
"../../service/operator-metadata/operator-metadata.service";
 import { StubOperatorMetadataService } from 
"../../service/operator-metadata/stub-operator-metadata.service";
 import { ComputingUnitStatusService } from 
"../../../common/service/computing-unit/computing-unit-status/computing-unit-status.service";
@@ -636,4 +638,150 @@ describe("MenuComponent", () => {
       });
     });
   });
+
+  // Regression coverage for #5323: the elapsed-time timer was refactored from 
a
+  // manually managed `durationUpdateSubscription` into a declarative 
`switchMap`
+  // pipe terminated by `untilDestroyed`. These tests pin the resulting 
behavior
+  // (base-duration updates, 1s cadence, restart-on-event, stop-when-idle) and,
+  // crucially, that the timer is torn down with the component so it cannot 
keep
+  // firing or leak after destroy.
+  describe("execution duration timer", () => {
+    let durationEvents$: Subject<{ type: "ExecutionDurationUpdateEvent" } & 
ExecutionDurationUpdateEvent>;
+    let timerFixture: ComponentFixture<MenuComponent>;
+    let timerComponent: MenuComponent;
+
+    function emitDuration(duration: number, isRunning: boolean): void {
+      durationEvents$.next({ type: "ExecutionDurationUpdateEvent", duration, 
isRunning });
+    }
+
+    beforeEach(() => {
+      vi.useFakeTimers();
+      durationEvents$ = new Subject();
+      const websocket = TestBed.inject(WorkflowWebsocketService);
+      const original = websocket.subscribeToEvent.bind(websocket);
+      // Only intercept the duration event; defer every other event type to the
+      // real implementation so unrelated subscriptions keep working.
+      vi.spyOn(websocket, "subscribeToEvent").mockImplementation((type: any) =>
+        type === "ExecutionDurationUpdateEvent" ? 
(durationEvents$.asObservable() as any) : original(type)
+      );
+
+      timerFixture = TestBed.createComponent(MenuComponent);
+      timerComponent = timerFixture.componentInstance;
+      timerFixture.detectChanges();
+    });
+
+    afterEach(() => {
+      vi.useRealTimers();
+    });
+
+    it("sets executionDuration to the event's base duration on each event", () 
=> {
+      emitDuration(5000, false);
+      expect(timerComponent.executionDuration).toBe(5000);
+
+      emitDuration(8000, false);
+      expect(timerComponent.executionDuration).toBe(8000);
+    });
+
+    it("advances the duration by 1s every second while running", () => {
+      emitDuration(0, true);
+      expect(timerComponent.executionDuration).toBe(0);
+
+      vi.advanceTimersByTime(1000);
+      expect(timerComponent.executionDuration).toBe(1000);
+
+      vi.advanceTimersByTime(2000);
+      expect(timerComponent.executionDuration).toBe(3000);
+    });
+
+    it("does not start a timer when the execution is not running", () => {
+      emitDuration(7000, false);
+
+      vi.advanceTimersByTime(5000);
+
+      expect(timerComponent.executionDuration).toBe(7000);
+    });
+
+    it("restarts the 1s timer on each new running event, cancelling the 
previous one", () => {
+      emitDuration(0, true);
+      vi.advanceTimersByTime(1000);
+      expect(timerComponent.executionDuration).toBe(1000);
+
+      // A new event resets the base duration and restarts the cadence; the
+      // previous timer must be cancelled (switchMap) so it cannot 
double-count.
+      emitDuration(10000, true);
+      expect(timerComponent.executionDuration).toBe(10000);
+
+      vi.advanceTimersByTime(500);
+      expect(timerComponent.executionDuration).toBe(10000);
+
+      vi.advanceTimersByTime(500);
+      expect(timerComponent.executionDuration).toBe(11000);
+    });
+
+    it("stops the timer when a running execution transitions to not running", 
() => {
+      emitDuration(0, true);
+      vi.advanceTimersByTime(1000);
+      expect(timerComponent.executionDuration).toBe(1000);
+
+      emitDuration(2000, false);
+      vi.advanceTimersByTime(5000);
+      expect(timerComponent.executionDuration).toBe(2000);
+    });
+
+    it("tears down the timer on destroy so the duration stops advancing", () 
=> {
+      emitDuration(0, true);
+      vi.advanceTimersByTime(1000);
+      expect(timerComponent.executionDuration).toBe(1000);
+
+      timerFixture.destroy();
+
+      // The previously running timer must not keep firing after destroy...
+      vi.advanceTimersByTime(5000);
+      expect(timerComponent.executionDuration).toBe(1000);
+
+      // ...nor should late events revive it (the source subscription is 
closed).
+      emitDuration(9999, true);
+      vi.advanceTimersByTime(5000);
+      expect(timerComponent.executionDuration).toBe(1000);
+    });
+  });
+
+  // Regression coverage for #5323: the computing-unit status subscription lost
+  // its manual `computingUnitStatusSubscription` aggregator and its
+  // `ngOnDestroy` unsubscribe, relying on `untilDestroyed` instead. These 
tests
+  // pin both that status updates still propagate and that they stop on 
destroy.
+  describe("computing unit status subscription", () => {
+    let status$: Subject<ComputingUnitState>;
+    let cuFixture: ComponentFixture<MenuComponent>;
+    let cuComponent: MenuComponent;
+
+    beforeEach(() => {
+      status$ = new Subject<ComputingUnitState>();
+      const cuService = TestBed.inject(ComputingUnitStatusService);
+      vi.spyOn(cuService, "getStatus").mockReturnValue(status$.asObservable());
+
+      cuFixture = TestBed.createComponent(MenuComponent);
+      cuComponent = cuFixture.componentInstance;
+      cuFixture.detectChanges();
+    });
+
+    it("updates computingUnitStatus and re-applies the run button behavior on 
each status emission", () => {
+      const applySpy = vi.spyOn(cuComponent, "applyRunButtonBehavior");
+
+      status$.next(ComputingUnitState.Running);
+
+      expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running);
+      expect(applySpy).toHaveBeenCalledTimes(1);
+    });
+
+    it("stops updating computingUnitStatus once the component is destroyed", 
() => {
+      status$.next(ComputingUnitState.Running);
+      expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running);
+
+      cuFixture.destroy();
+
+      status$.next(ComputingUnitState.NoComputingUnit);
+      expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running);
+    });
+  });
 });
diff --git a/frontend/src/app/workspace/component/menu/menu.component.ts 
b/frontend/src/app/workspace/component/menu/menu.component.ts
index 69f08e4fef..aa9b3a0727 100644
--- a/frontend/src/app/workspace/component/menu/menu.component.ts
+++ b/frontend/src/app/workspace/component/menu/menu.component.ts
@@ -33,7 +33,7 @@ import { WorkflowActionService } from 
"../../service/workflow-graph/model/workfl
 import { ExecutionState } from "../../types/execute-workflow.interface";
 import { WorkflowWebsocketService } from 
"../../service/workflow-websocket/workflow-websocket.service";
 import { WorkflowResultExportService } from 
"../../service/workflow-result-export/workflow-result-export.service";
-import { catchError, debounceTime, filter, mergeMap, tap } from 
"rxjs/operators";
+import { catchError, debounceTime, filter, mergeMap, switchMap, tap } from 
"rxjs/operators";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
 import { WorkflowUtilService } from 
"../../service/workflow-graph/util/workflow-util.service";
 import { WorkflowVersionService } from 
"../../../dashboard/service/user/workflow-version/workflow-version.service";
@@ -43,7 +43,7 @@ import { saveAs } from "file-saver";
 import { NotificationService } from 
"src/app/common/service/notification/notification.service";
 import { OperatorMenuService } from 
"../../service/operator-menu/operator-menu.service";
 import { CoeditorPresenceService } from 
"../../service/workflow-graph/model/coeditor-presence.service";
-import { firstValueFrom, of, Subscription, timer } from "rxjs";
+import { EMPTY, firstValueFrom, of, timer } from "rxjs";
 import { isDefined } from "../../../common/util/predicate";
 import { NzModalService } from "ng-zorro-antd/modal";
 import { ResultExportationComponent } from 
"../result-exportation/result-exportation.component";
@@ -154,14 +154,12 @@ export class MenuComponent implements OnInit, OnDestroy {
   public runDisable = false;
 
   public executionDuration = 0;
-  private durationUpdateSubscription: Subscription = new Subscription();
 
   // flag to display a particular version in the current canvas
   public displayParticularWorkflowVersion: boolean = false;
   public onClickRunHandler: () => void;
 
   // Computing unit status variables
-  private computingUnitStatusSubscription: Subscription = new Subscription();
   public selectedComputingUnit: DashboardWorkflowComputingUnit | null = null;
   public computingUnitStatus: ComputingUnitState = 
ComputingUnitState.NoComputingUnit;
 
@@ -193,17 +191,14 @@ export class MenuComponent implements OnInit, OnDestroy {
   ) {
     workflowWebsocketService
       .subscribeToEvent("ExecutionDurationUpdateEvent")
-      .pipe(untilDestroyed(this))
-      .subscribe(event => {
-        this.executionDuration = event.duration;
-        this.durationUpdateSubscription.unsubscribe();
-        if (event.isRunning) {
-          this.durationUpdateSubscription = timer(1000, 1000)
-            .pipe(untilDestroyed(this))
-            .subscribe(() => {
-              this.executionDuration += 1000;
-            });
-        }
+      .pipe(
+        tap(event => (this.executionDuration = event.duration)),
+        // restart the 1s timer on each event, only while running
+        switchMap(event => (event.isRunning ? timer(1000, 1000) : EMPTY)),
+        untilDestroyed(this)
+      )
+      .subscribe(() => {
+        this.executionDuration += 1000;
       });
     this.executionState = executeWorkflowService.getExecutionState().state;
     // return the run button after the execution is finished, either
@@ -254,7 +249,6 @@ export class MenuComponent implements OnInit, OnDestroy {
 
   ngOnDestroy(): void {
     this.workflowResultExportService.resetFlags();
-    this.computingUnitStatusSubscription.unsubscribe();
   }
 
   private subscribeToComputingUnitSelection(): void {
@@ -271,15 +265,13 @@ export class MenuComponent implements OnInit, OnDestroy {
    */
   private subscribeToComputingUnitStatus(): void {
     // Subscribe to get the computing unit status
-    this.computingUnitStatusSubscription.add(
-      this.computingUnitStatusService
-        .getStatus()
-        .pipe(untilDestroyed(this))
-        .subscribe(status => {
-          this.computingUnitStatus = status;
-          this.applyRunButtonBehavior(this.getRunButtonBehavior());
-        })
-    );
+    this.computingUnitStatusService
+      .getStatus()
+      .pipe(untilDestroyed(this))
+      .subscribe(status => {
+        this.computingUnitStatus = status;
+        this.applyRunButtonBehavior(this.getRunButtonBehavior());
+      });
   }
 
   /**

Reply via email to