This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-stage-by-stage
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-stage-by-stage by this
push:
new a1119bc0ab update
a1119bc0ab is described below
commit a1119bc0abc8ce3206df14c6e1ec40837c6fd528
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jan 10 01:34:58 2026 -0800
update
---
.../controller/WorkflowScheduler.scala | 2 +-
.../apache/texera/workflow/WorkflowCompiler.scala | 2 +-
.../engine/e2e/BatchSizePropagationSpec.scala | 12 ++---
.../texera/amber/core/workflow/ExecutionMode.java | 6 +++
.../texera/amber/core/workflow/PhysicalPlan.scala | 4 +-
.../amber/core/workflow/WorkflowContext.scala | 2 +-
.../amber/core/workflow/WorkflowSettings.scala | 8 ++--
frontend/src/app/common/type/workflow.ts | 7 ++-
.../user/user-workflow/user-workflow.component.ts | 52 ++++++++++++----------
.../left-panel/settings/settings.component.html | 10 ++---
.../left-panel/settings/settings.component.ts | 20 ++++-----
.../model/workflow-action.service.ts | 38 ++++++++--------
12 files changed, 89 insertions(+), 74 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
index dec3fc3c7a..fbe5e73a1d 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -45,7 +45,7 @@ class WorkflowScheduler(
// CostBasedRegionPlanGenerator considers costs to try to find an
optimal plan.
new CostBasedScheduleGenerator(
workflowContext,
- physicalPlan.copy(batchProcessing =
workflowContext.workflowSettings.batchProcessing),
+ physicalPlan.copy(executionMode =
workflowContext.workflowSettings.executionMode),
actorId
).generate()
this.schedule = generatedSchedule
diff --git
a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
index 4d76743c30..7f17e1aebd 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
@@ -154,7 +154,7 @@ class WorkflowCompiler(
context.workflowSettings = WorkflowSettings(
context.workflowSettings.dataTransferBatchSize,
outputPortsNeedingStorage,
- context.workflowSettings.batchProcessing
+ context.workflowSettings.executionMode
)
Workflow(context, logicalPlan, physicalPlan)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala
index 8b0bd20166..35a81752ba 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala
@@ -23,7 +23,7 @@ import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
import org.apache.pekko.util.Timeout
import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext,
WorkflowSettings}
+import org.apache.texera.amber.core.workflow.{ExecutionMode, PortIdentity,
WorkflowContext, WorkflowSettings}
import org.apache.texera.amber.engine.architecture.controller._
import
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
@@ -119,7 +119,7 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for headerlessCsv
workflow" in {
val expectedBatchSize = 1
- val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize, batchProcessing = false)
+ val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize, executionMode = ExecutionMode.STREAMING)
val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
@@ -141,7 +141,7 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for headerlessCsv->keyword
workflow" in {
val expectedBatchSize = 500
- val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize, batchProcessing = false)
+ val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize, executionMode = ExecutionMode.STREAMING)
val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
@@ -171,7 +171,7 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for csv->keyword->count
workflow" in {
val expectedBatchSize = 100
- val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize, batchProcessing = false)
+ val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize, executionMode = ExecutionMode.STREAMING)
val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
@@ -209,7 +209,7 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for
csv->keyword->averageAndGroupBy workflow" in {
val expectedBatchSize = 300
- val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize, batchProcessing = false)
+ val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize, executionMode = ExecutionMode.STREAMING)
val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
@@ -250,7 +250,7 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for csv->(csv->)->join
workflow" in {
val expectedBatchSize = 1
- val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize, batchProcessing = false)
+ val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize, executionMode = ExecutionMode.STREAMING)
val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java
new file mode 100644
index 0000000000..3731c72b2b
--- /dev/null
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java
@@ -0,0 +1,6 @@
+package org.apache.texera.amber.core.workflow;
+
+public enum ExecutionMode {
+ STREAMING,
+ BATCH
+}
\ No newline at end of file
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala
index 31d61edcbd..ad3f24e97b 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala
@@ -39,7 +39,7 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala,
IteratorHasAsScala}
case class PhysicalPlan(
operators: Set[PhysicalOp],
links: Set[PhysicalLink],
- batchProcessing: Boolean = false
+ executionMode: ExecutionMode = ExecutionMode.STREAMING
) extends LazyLogging {
@transient private lazy val operatorMap: Map[PhysicalOpIdentity, PhysicalOp]
=
@@ -246,7 +246,7 @@ case class PhysicalPlan(
getOperator(physicalOp.id).isInputLinkDependee(
link
) ||
getOperator(upstreamPhysicalOpId).isOutputLinkBlocking(link)
- || batchProcessing
+ || executionMode == ExecutionMode.BATCH
)
}
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
index 04ce26d7cb..1e68620ed5 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
@@ -31,7 +31,7 @@ object WorkflowContext {
val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L)
val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings(
dataTransferBatchSize = 400,
- batchProcessing = false
+ executionMode = ExecutionMode.STREAMING
)
}
class WorkflowContext(
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
index 17885c5db1..e71f16932f 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
@@ -19,8 +19,10 @@
package org.apache.texera.amber.core.workflow
+
+
case class WorkflowSettings(
- dataTransferBatchSize: Int,
- outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty,
- batchProcessing: Boolean
+ dataTransferBatchSize: Int,
+ outputPortsNeedingStorage:
Set[GlobalPortIdentity] = Set.empty,
+ executionMode: ExecutionMode
)
diff --git a/frontend/src/app/common/type/workflow.ts
b/frontend/src/app/common/type/workflow.ts
index c959183f57..7965aa9d48 100644
--- a/frontend/src/app/common/type/workflow.ts
+++ b/frontend/src/app/common/type/workflow.ts
@@ -20,9 +20,14 @@
import { WorkflowMetadata } from
"../../dashboard/type/workflow-metadata.interface";
import { CommentBox, OperatorLink, OperatorPredicate, Point } from
"../../workspace/types/workflow-common.interface";
+export enum ExecutionMode {
+ STREAMING,
+ BATCH,
+}
+
export interface WorkflowSettings {
dataTransferBatchSize: number;
- batchProcessing: boolean;
+ executionMode: ExecutionMode;
}
/**
diff --git
a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
index 28db9a2dfe..0e706a659d 100644
---
a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
+++
b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
@@ -17,34 +17,38 @@
* under the License.
*/
-import { AfterViewInit, Component, Input, ViewChild } from "@angular/core";
-import { Router } from "@angular/router";
-import { NzModalService } from "ng-zorro-antd/modal";
-import { firstValueFrom, from, lastValueFrom, Observable, of } from "rxjs";
+import {AfterViewInit, Component, Input, ViewChild} from "@angular/core";
+import {Router} from "@angular/router";
+import {NzModalService} from "ng-zorro-antd/modal";
+import {firstValueFrom, from, lastValueFrom, Observable, of} from "rxjs";
import {
DEFAULT_WORKFLOW_NAME,
WorkflowPersistService,
} from "../../../../common/service/workflow-persist/workflow-persist.service";
-import { NgbdModalAddProjectWorkflowComponent } from
"../user-project/user-project-section/ngbd-modal-add-project-workflow/ngbd-modal-add-project-workflow.component";
-import { NgbdModalRemoveProjectWorkflowComponent } from
"../user-project/user-project-section/ngbd-modal-remove-project-workflow/ngbd-modal-remove-project-workflow.component";
-import { DashboardEntry, UserInfo } from "../../../type/dashboard-entry";
-import { UserService } from "../../../../common/service/user/user.service";
-import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
-import { NotificationService } from
"../../../../common/service/notification/notification.service";
-import { WorkflowContent } from "../../../../common/type/workflow";
-import { NzUploadFile } from "ng-zorro-antd/upload";
+import {
+ NgbdModalAddProjectWorkflowComponent
+} from
"../user-project/user-project-section/ngbd-modal-add-project-workflow/ngbd-modal-add-project-workflow.component";
+import {
+ NgbdModalRemoveProjectWorkflowComponent
+} from
"../user-project/user-project-section/ngbd-modal-remove-project-workflow/ngbd-modal-remove-project-workflow.component";
+import {DashboardEntry, UserInfo} from "../../../type/dashboard-entry";
+import {UserService} from "../../../../common/service/user/user.service";
+import {UntilDestroy, untilDestroyed} from "@ngneat/until-destroy";
+import {NotificationService} from
"../../../../common/service/notification/notification.service";
+import {ExecutionMode, WorkflowContent} from
"../../../../common/type/workflow";
+import {NzUploadFile} from "ng-zorro-antd/upload";
import * as JSZip from "jszip";
-import { FiltersComponent } from "../filters/filters.component";
-import { SearchResultsComponent } from
"../search-results/search-results.component";
-import { SearchService } from "../../../service/user/search.service";
-import { SortMethod } from "../../../type/sort-method";
-import { isDefined } from "../../../../common/util/predicate";
-import { UserProjectService } from
"../../../service/user/project/user-project.service";
-import { map, mergeMap, switchMap, tap } from "rxjs/operators";
-import { DashboardWorkflow } from "../../../type/dashboard-workflow.interface";
-import { DownloadService } from
"../../../service/user/download/download.service";
-import { DASHBOARD_USER_WORKSPACE } from "../../../../app-routing.constant";
-import { GuiConfigService } from
"../../../../common/service/gui-config.service";
+import {FiltersComponent} from "../filters/filters.component";
+import {SearchResultsComponent} from
"../search-results/search-results.component";
+import {SearchService} from "../../../service/user/search.service";
+import {SortMethod} from "../../../type/sort-method";
+import {isDefined} from "../../../../common/util/predicate";
+import {UserProjectService} from
"../../../service/user/project/user-project.service";
+import {map, mergeMap, switchMap, tap} from "rxjs/operators";
+import {DashboardWorkflow} from "../../../type/dashboard-workflow.interface";
+import {DownloadService} from
"../../../service/user/download/download.service";
+import {DASHBOARD_USER_WORKSPACE} from "../../../../app-routing.constant";
+import {GuiConfigService} from "../../../../common/service/gui-config.service";
/**
* Saved-workflow-section component contains information and functionality
@@ -232,7 +236,7 @@ export class UserWorkflowComponent implements AfterViewInit
{
operatorPositions: {},
settings: {
dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize,
- batchProcessing: false,
+ executionMode: ExecutionMode.STREAMING,
},
};
let localPid = this.pid;
diff --git
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
index d069e9469f..c5a52f1c6b 100644
---
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
+++
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
@@ -21,14 +21,12 @@
[formGroup]="settingsForm"
class="form-inline">
<b>Execution Mode:</b>
- <nz-radio-group formControlName="batchProcessing" nzSize="small">
- <label nz-radio [nzValue]="true">Batch</label>
- <label nz-radio [nzValue]="false">Streaming</label>
+ <nz-radio-group formControlName="executionMode" >
+ <label nz-radio [nzValue]="ExecutionMode.BATCH">Batch</label>
+ <br>
+ <label nz-radio [nzValue]="ExecutionMode.STREAMING">Streaming</label>
</nz-radio-group>
<br>
- <br>
- <br>
- <br>
<div class="form-group">
<label for="dataTransferBatchSize">Data Transfer Batch Size:</label>
<input
diff --git
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
index 6524e3d2cf..ab0c267fa4 100644
---
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
+++
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
@@ -24,7 +24,7 @@ import { WorkflowActionService } from
"../../../service/workflow-graph/model/wor
import { WorkflowPersistService } from
"src/app/common/service/workflow-persist/workflow-persist.service";
import { UserService } from "../../../../common/service/user/user.service";
import { NotificationService } from
"src/app/common/service/notification/notification.service";
-import { GuiConfigService } from
"../../../../common/service/gui-config.service";
+import {ExecutionMode} from "../../../../common/type/workflow";
@UntilDestroy()
@Component({
@@ -34,7 +34,6 @@ import { GuiConfigService } from
"../../../../common/service/gui-config.service"
})
export class SettingsComponent implements OnInit {
settingsForm!: FormGroup;
- currentDataTransferBatchSize!: number;
constructor(
private fb: FormBuilder,
@@ -42,7 +41,6 @@ export class SettingsComponent implements OnInit {
private workflowPersistService: WorkflowPersistService,
private userService: UserService,
private notificationService: NotificationService,
- private config: GuiConfigService
) {}
ngOnInit(): void {
@@ -51,7 +49,7 @@ export class SettingsComponent implements OnInit {
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize,
[Validators.required, Validators.min(1)],
],
- batchProcessing:
[this.workflowActionService.getWorkflowContent().settings.batchProcessing],
+ executionMode:
[this.workflowActionService.getWorkflowContent().settings.executionMode],
});
this.settingsForm
@@ -64,10 +62,10 @@ export class SettingsComponent implements OnInit {
});
this.settingsForm
- .get("batchProcessing")!
+ .get("executionMode")!
.valueChanges.pipe(untilDestroyed(this))
- .subscribe((enabled: boolean) => {
- this.updateBatchProcessing(enabled);
+ .subscribe((mode: ExecutionMode) => {
+ this.updateExecutionMode(mode);
});
this.workflowActionService
@@ -77,7 +75,7 @@ export class SettingsComponent implements OnInit {
this.settingsForm.patchValue(
{
dataTransferBatchSize:
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize,
- batchProcessing:
this.workflowActionService.getWorkflowContent().settings.batchProcessing,
+ executionMode:
this.workflowActionService.getWorkflowContent().settings.executionMode,
},
{ emitEvent: false }
);
@@ -102,8 +100,10 @@ export class SettingsComponent implements OnInit {
});
}
- public updateBatchProcessing(enabled: boolean) {
- this.workflowActionService.updateBatchProcessing(enabled);
+ public updateExecutionMode(mode: ExecutionMode) {
+ this.workflowActionService.updateExecutionMode(mode);
this.persistWorkflow();
}
+
+ protected readonly ExecutionMode = ExecutionMode;
}
diff --git
a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
index 4c960d027c..4b3ce64b33 100644
---
a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
+++
b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
@@ -17,12 +17,12 @@
* under the License.
*/
-import { Injectable } from "@angular/core";
+import {Injectable} from "@angular/core";
import * as joint from "jointjs";
-import { BehaviorSubject, merge, Observable, Subject } from "rxjs";
-import { Workflow, WorkflowContent, WorkflowSettings } from
"../../../../common/type/workflow";
-import { WorkflowMetadata } from
"../../../../dashboard/type/workflow-metadata.interface";
+import {BehaviorSubject, merge, Observable, Subject} from "rxjs";
+import {ExecutionMode, Workflow, WorkflowContent, WorkflowSettings} from
"../../../../common/type/workflow";
+import {WorkflowMetadata} from
"../../../../dashboard/type/workflow-metadata.interface";
import {
Comment,
CommentBox,
@@ -32,18 +32,18 @@ import {
Point,
PortDescription,
} from "../../../types/workflow-common.interface";
-import { JointUIService } from "../../joint-ui/joint-ui.service";
-import { OperatorMetadataService } from
"../../operator-metadata/operator-metadata.service";
-import { UndoRedoService } from "../../undo-redo/undo-redo.service";
-import { WorkflowUtilService } from "../util/workflow-util.service";
-import { JointGraphWrapper } from "./joint-graph-wrapper";
-import { SyncTexeraModel } from "./sync-texera-model";
-import { WorkflowGraph, WorkflowGraphReadonly } from "./workflow-graph";
-import { filter } from "rxjs/operators";
-import { isDefined } from "../../../../common/util/predicate";
-import { User } from "../../../../common/type/user";
-import { SharedModelChangeHandler } from "./shared-model-change-handler";
-import { GuiConfigService } from
"../../../../common/service/gui-config.service";
+import {JointUIService} from "../../joint-ui/joint-ui.service";
+import {OperatorMetadataService} from
"../../operator-metadata/operator-metadata.service";
+import {UndoRedoService} from "../../undo-redo/undo-redo.service";
+import {WorkflowUtilService} from "../util/workflow-util.service";
+import {JointGraphWrapper} from "./joint-graph-wrapper";
+import {SyncTexeraModel} from "./sync-texera-model";
+import {WorkflowGraph, WorkflowGraphReadonly} from "./workflow-graph";
+import {filter} from "rxjs/operators";
+import {isDefined} from "../../../../common/util/predicate";
+import {User} from "../../../../common/type/user";
+import {SharedModelChangeHandler} from "./shared-model-change-handler";
+import {GuiConfigService} from "../../../../common/service/gui-config.service";
export const DEFAULT_WORKFLOW_NAME = "Untitled Workflow";
export const DEFAULT_WORKFLOW = {
@@ -127,7 +127,7 @@ export class WorkflowActionService {
private getDefaultSettings(): WorkflowSettings {
return {
dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize,
- batchProcessing: false,
+ executionMode: ExecutionMode.STREAMING,
};
}
@@ -808,8 +808,8 @@ export class WorkflowActionService {
}
}
- public updateBatchProcessing(enabled: boolean): void {
- this.setWorkflowSettings({ ...this.workflowSettings, batchProcessing:
enabled });
+ public updateExecutionMode(mode: ExecutionMode): void {
+ this.setWorkflowSettings({ ...this.workflowSettings, executionMode: mode
});
}
public clearWorkflow(): void {