This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 40d2cfbc11f [FLINK-31471] Allow setting JobResourceRequirements
through WEB UI.
40d2cfbc11f is described below
commit 40d2cfbc11f4c3598dcba5cbc7367237a9ddbf2f
Author: David Moravek <[email protected]>
AuthorDate: Tue Apr 4 14:49:58 2023 +0200
[FLINK-31471] Allow setting JobResourceRequirements through WEB UI.
---
.../shortcodes/generated/rest_v1_dispatcher.html | 3 +
.../shortcodes/generated/web_configuration.html | 6 ++
docs/static/generated/rest_v1_dispatcher.yml | 2 +
.../apache/flink/configuration/ClusterOptions.java | 2 +-
.../org/apache/flink/configuration/WebOptions.java | 10 +-
.../runtime/webmonitor/history/HistoryServer.java | 1 +
.../src/test/resources/rest_api_v1.snapshot | 3 +
.../src/app/interfaces/configuration.ts | 1 +
...nfiguration.ts => job-resource-requirements.ts} | 30 ++----
.../pages/job/overview/job-overview.component.html | 1 +
.../pages/job/overview/job-overview.component.ts | 14 ++-
.../overview/list/job-overview-list.component.html | 37 +++++++-
.../overview/list/job-overview-list.component.ts | 61 +++++++++++-
.../src/app/services/job.service.spec.ts | 102 +++++++++++++++++++++
.../web-dashboard/src/app/services/job.service.ts | 32 ++++++-
.../rest/handler/RestHandlerConfiguration.java | 19 +++-
.../handler/cluster/DashboardConfigHandler.java | 4 +-
.../rest/messages/DashboardConfiguration.java | 16 +++-
.../runtime/webmonitor/WebMonitorEndpoint.java | 3 +-
.../rest/handler/RestHandlerConfigurationTest.java | 55 +++++++++--
.../rest/messages/DashboardConfigurationTest.java | 2 +-
21 files changed, 353 insertions(+), 51 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 4d446c68a06..ae11c7dc4f1 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -70,6 +70,9 @@
"web-history" : {
"type" : "boolean"
},
+ "web-rescale" : {
+ "type" : "boolean"
+ },
"web-submit" : {
"type" : "boolean"
}
diff --git a/docs/layouts/shortcodes/generated/web_configuration.html
b/docs/layouts/shortcodes/generated/web_configuration.html
index 693580ba83b..7ac55f5685c 100644
--- a/docs/layouts/shortcodes/generated/web_configuration.html
+++ b/docs/layouts/shortcodes/generated/web_configuration.html
@@ -50,6 +50,12 @@
<td>Long</td>
<td>Refresh interval for the web-frontend in milliseconds.</td>
</tr>
+ <tr>
+ <td><h5>web.rescale.enable</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Flag indicating whether jobs can be rescaled from the
web-frontend.</td>
+ </tr>
<tr>
<td><h5>web.submit.enable</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml
b/docs/static/generated/rest_v1_dispatcher.yml
index 2e8d49145a2..0135b51469a 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -2090,6 +2090,8 @@ components:
type: boolean
web-history:
type: boolean
+ web-rescale:
+ type: boolean
web-submit:
type: boolean
GarbageCollectorInfo:
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index 2785f598dbb..92678f82465 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -213,7 +213,7 @@ public class ClusterOptions {
}
}
- private static boolean isReactiveModeEnabled(Configuration configuration) {
+ public static boolean isReactiveModeEnabled(Configuration configuration) {
return configuration.get(JobManagerOptions.SCHEDULER_MODE)
== SchedulerExecutionMode.REACTIVE;
}
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
index ebc9ac4e85b..e40adcbaaaa 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -133,7 +133,7 @@ public class WebOptions {
.withDescription(
"Flag indicating whether jobs can be uploaded and
run from the web-frontend.");
- /** Config parameter indicating whether jobs can be cancel from the
web-frontend. */
+ /** Config parameter indicating whether jobs can be canceled from the
web-frontend. */
public static final ConfigOption<Boolean> CANCEL_ENABLE =
key("web.cancel.enable")
.booleanType()
@@ -141,6 +141,14 @@ public class WebOptions {
.withDescription(
"Flag indicating whether jobs can be canceled from
the web-frontend.");
+ /** Config parameter indicating whether jobs can be rescaled from the
web-frontend. */
+ public static final ConfigOption<Boolean> RESCALE_ENABLE =
+ key("web.rescale.enable")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Flag indicating whether jobs can be rescaled from
the web-frontend.");
+
/** Config parameter defining the number of checkpoints to remember for
recent history. */
public static final ConfigOption<Integer> CHECKPOINTS_HISTORY_SIZE =
key("web.checkpoints.history")
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index fff6d42306f..0f170d9d79a 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -379,6 +379,7 @@ public class HistoryServer {
ZonedDateTime.now(),
false,
false,
+ false,
true)));
fw.flush();
} catch (IOException ioe) {
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 04f092a2f8b..20d01ddea8a 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -62,6 +62,9 @@
"web-cancel" : {
"type" : "boolean"
},
+ "web-rescale" : {
+ "type" : "boolean"
+ },
"web-history" : {
"type" : "boolean"
}
diff --git
a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
b/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
index dac450e3c1c..a500a729a15 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
@@ -42,5 +42,6 @@ export interface Configuration {
'web-history': boolean;
'web-submit': boolean;
'web-cancel': boolean;
+ 'web-rescale': boolean;
};
}
diff --git
a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-resource-requirements.ts
similarity index 61%
copy from flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
copy to
flink-runtime-web/web-dashboard/src/app/interfaces/job-resource-requirements.ts
index dac450e3c1c..104cf81e4e4 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
+++
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-resource-requirements.ts
@@ -16,31 +16,13 @@
* limitations under the License.
*/
-export interface JvmInfo {
- version: string;
- arch: string;
- options: string[];
+export interface JobResourceRequirements {
+ [key: string]: JobVertexResourceRequirements;
}
-export interface EnvironmentInfo {
- jvm: JvmInfo;
- classpath: string[];
-}
-
-export interface ClusterConfiguration {
- key: string;
- value: string;
-}
-
-export interface Configuration {
- 'refresh-interval': number;
- 'timezone-name': string;
- 'timezone-offset': number;
- 'flink-version': string;
- 'flink-revision': string;
- features: {
- 'web-history': boolean;
- 'web-submit': boolean;
- 'web-cancel': boolean;
+export interface JobVertexResourceRequirements {
+ parallelism: {
+ lowerBound: number;
+ upperBound: number;
};
}
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
index 533540317d9..53e5e487a08 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
@@ -30,6 +30,7 @@
<ng-container *ngIf="nodes.length > 0">
<flink-job-overview-list
(nodeClick)="onNodeClick($event)"
+ (rescale)="onRescale($event)"
[nodes]="nodes"
[selectedNode]="selectedNode"
></flink-job-overview-list>
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
index 28e5d15d879..10f7c034b40 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
@@ -34,8 +34,9 @@ import { DagreComponent } from
'@flink-runtime-web/components/dagre/dagre.compon
import { ResizeComponent } from
'@flink-runtime-web/components/resize/resize.component';
import { NodesItemCorrect, NodesItemLink } from
'@flink-runtime-web/interfaces';
import { JobOverviewListComponent } from
'@flink-runtime-web/pages/job/overview/list/job-overview-list.component';
-import { MetricsService } from '@flink-runtime-web/services';
+import { JobService, MetricsService } from '@flink-runtime-web/services';
import { NzAlertModule } from 'ng-zorro-antd/alert';
+import { NzNotificationService } from 'ng-zorro-antd/notification';
import { JobLocalService } from '../job-local.service';
@@ -65,6 +66,8 @@ export class JobOverviewComponent implements OnInit,
OnDestroy {
public readonly elementRef: ElementRef,
private readonly metricService: MetricsService,
private readonly jobLocalService: JobLocalService,
+ private readonly jobService: JobService,
+ private readonly notificationService: NzNotificationService,
private readonly cdr: ChangeDetectorRef
) {}
@@ -115,6 +118,15 @@ export class JobOverviewComponent implements OnInit,
OnDestroy {
}
}
+ public onRescale(desiredParallelism: Map<string, number>): void {
+ this.jobService.changeDesiredParallelism(this.jobId,
desiredParallelism).subscribe(() => {
+ this.notificationService.success(
+ 'Rescaling operation.',
+ 'Job resources requirements have been updated. Job will now try to
rescale.'
+ );
+ });
+ }
+
public onResizeEnd(): void {
if (!this.selectedNode) {
this.dagreComponent.moveToCenter();
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html
index 873c7c53231..c6a4f9ae92c 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html
@@ -33,11 +33,12 @@
<th [nzSortFn]="sortReadRecordsFn" nzWidth="150px">Records Received</th>
<th [nzSortFn]="sortWriteBytesFn" nzWidth="150px">Bytes Sent</th>
<th [nzSortFn]="sortWriteRecordsFn" nzWidth="120px">Records Sent</th>
- <th [nzSortFn]="sortParallelismFn" nzWidth="120px">Parallelism</th>
+ <th [nzSortFn]="sortParallelismFn" nzWidth="100px">Parallelism</th>
<th [nzSortFn]="sortStartTimeFn" nzWidth="150px">Start Time</th>
<th [nzSortFn]="sortDurationFn" nzWidth="150px">Duration</th>
<th [nzSortFn]="sortEndTimeFn" nzWidth="150px">End Time</th>
- <th nzWidth="100px" nzRight>Tasks</th>
+ <th nzWidth="60px" nzRight>Tasks</th>
+ <th *ngIf="webRescaleEnabled" nzWidth="80px" nzRight>Scale</th>
</tr>
</thead>
<tbody>
@@ -95,13 +96,43 @@
{{ node.detail.metrics['write-records'] | number: '1.0-0' }}
</span>
</td>
- <td>{{ node.parallelism }}</td>
+ <td>
+ <span>
+ {{ node.parallelism }}
+ </span>
+ <span *ngIf="desiredParallelism.has(node.id)">
+ <b>
+ <i nz-icon nzType="sync"></i>
+ {{ desiredParallelism.get(node.id) }}
+ </b>
+ </span>
+ </td>
<td>{{ node.detail['start-time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss'
}}</td>
<td>{{ node.detail?.duration | humanizeDuration }}</td>
<td>{{ node.detail['end-time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss'
}}</td>
<td nzRight [class.selected]="selectedNode?.id === node.id">
<flink-task-badge [tasks]="node.detail?.tasks"></flink-task-badge>
</td>
+ <td *ngIf="webRescaleEnabled" nzRight>
+ <nz-button-group>
+ <button
+ nz-button
+ nzSize="small"
+ nzType="default"
+ (click)="clickScaleUp(node); $event.stopPropagation()"
+ >
+ <span nz-icon nzType="plus"></span>
+ </button>
+ <button
+ nz-button
+ nzSize="small"
+ nzType="default"
+ (click)="clickScaleDown(node); $event.stopPropagation()"
+ >
+ <span nz-icon nzType="minus"></span>
+ </button>
+ </nz-button-group>
+ </td>
</tr>
</tbody>
</nz-table>
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts
index 498f10a852d..ad632360118 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts
@@ -26,6 +26,10 @@ import { JobBadgeComponent } from
'@flink-runtime-web/components/job-badge/job-b
import { ResizeComponent } from
'@flink-runtime-web/components/resize/resize.component';
import { TaskBadgeComponent } from
'@flink-runtime-web/components/task-badge/task-badge.component';
import { NodesItemCorrect } from '@flink-runtime-web/interfaces';
+import { StatusService } from '@flink-runtime-web/services';
+import { NzBadgeModule } from 'ng-zorro-antd/badge';
+import { NzButtonModule } from 'ng-zorro-antd/button';
+import { NzIconModule } from 'ng-zorro-antd/icon';
import { NzTableModule } from 'ng-zorro-antd/table';
import { NzTableSortFn } from 'ng-zorro-antd/table/src/table.types';
import { NzToolTipModule } from 'ng-zorro-antd/tooltip';
@@ -36,6 +40,8 @@ function createSortFn(
return (pre, next) => (selector(pre)! > selector(next)! ? 1 : -1);
}
+const rescaleTimeout = 2500;
+
@Component({
selector: 'flink-job-overview-list',
templateUrl: './job-overview-list.component.html',
@@ -52,12 +58,16 @@ function createSortFn(
HumanizeDatePipe,
HumanizeDurationPipe,
TaskBadgeComponent,
- ResizeComponent
+ ResizeComponent,
+ NzButtonModule,
+ NzIconModule,
+ NzBadgeModule
],
standalone: true
})
export class JobOverviewListComponent {
public readonly trackById = (_: number, node: NodesItemCorrect): string =>
node.id;
+ public readonly webRescaleEnabled =
this.statusService.configuration.features['web-rescale'];
public readonly sortStatusFn = createSortFn(item => item.detail?.status);
public readonly sortReadBytesFn = createSortFn(item =>
item.detail?.metrics?.['read-bytes']);
@@ -70,26 +80,69 @@ export class JobOverviewListComponent {
public readonly sortEndTimeFn = createSortFn(item =>
item.detail?.['end-time']);
public innerNodes: NodesItemCorrect[] = [];
- public sortName: string;
- public sortValue: string;
public left = 390;
+ public desiredParallelism = new Map<string, number>();
+
+ public rescaleTimeoutId: number | undefined;
+
@Output() public readonly nodeClick = new EventEmitter<NodesItemCorrect>();
+ @Output() public readonly rescale = new EventEmitter<Map<string, number>>();
+
@Input() public selectedNode: NodesItemCorrect;
@Input()
public set nodes(value: NodesItemCorrect[]) {
this.innerNodes = value;
+ for (const node of value) {
+ if (node.parallelism == this.desiredParallelism.get(node.id)) {
+ this.desiredParallelism.delete(node.id);
+ }
+ }
}
public get nodes(): NodesItemCorrect[] {
return this.innerNodes;
}
- constructor(public readonly elementRef: ElementRef) {}
+ constructor(public readonly elementRef: ElementRef, private readonly
statusService: StatusService) {}
public clickNode(node: NodesItemCorrect): void {
this.nodeClick.emit(node);
}
+
+ public clickScaleUp(node: NodesItemCorrect): void {
+ let currentDesiredParallelism = this.desiredParallelism.get(node.id);
+ if (currentDesiredParallelism == undefined) {
+ currentDesiredParallelism = node.parallelism;
+ }
+ const newDesiredParallelism = currentDesiredParallelism + 1;
+ this.changeDesiredParallelism(node, newDesiredParallelism);
+ }
+
+ public clickScaleDown(node: NodesItemCorrect): void {
+ let currentDesiredParallelism = this.desiredParallelism.get(node.id);
+ if (currentDesiredParallelism == undefined) {
+ currentDesiredParallelism = node.parallelism;
+ }
+ const newDesiredParallelism = Math.max(1, currentDesiredParallelism - 1);
+ this.changeDesiredParallelism(node, newDesiredParallelism);
+ }
+
+ private changeDesiredParallelism(node: NodesItemCorrect,
newDesiredParallelism: number): void {
+ if (newDesiredParallelism == node.parallelism) {
+ this.desiredParallelism.delete(node.id);
+ } else {
+ this.desiredParallelism.set(node.id, newDesiredParallelism);
+ }
+ if (this.rescaleTimeoutId != undefined) {
+ window.clearTimeout(this.rescaleTimeoutId);
+ }
+ this.rescaleTimeoutId = window.setTimeout(() => {
+ if (this.desiredParallelism.size > 0) {
+ this.rescale.emit(this.desiredParallelism);
+ }
+ }, rescaleTimeout);
+ }
}
diff --git
a/flink-runtime-web/web-dashboard/src/app/services/job.service.spec.ts
b/flink-runtime-web/web-dashboard/src/app/services/job.service.spec.ts
new file mode 100644
index 00000000000..db7613b4f5c
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.spec.ts
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { HttpClientTestingModule, HttpTestingController } from
'@angular/common/http/testing';
+import { TestBed } from '@angular/core/testing';
+
+import { JobResourceRequirements } from
'@flink-runtime-web/interfaces/job-resource-requirements';
+import { ConfigService } from '@flink-runtime-web/services/config.service';
+import { JobService } from '@flink-runtime-web/services/job.service';
+
+const clone = function (jobResourceRequirements: JobResourceRequirements):
JobResourceRequirements {
+ return JSON.parse(JSON.stringify(jobResourceRequirements));
+};
+
+describe('Job Service', () => {
+ let configService: ConfigService;
+ let jobService: JobService;
+ let httpTestingController: HttpTestingController;
+
+ beforeEach(() => {
+ TestBed.configureTestingModule({
+ imports: [HttpClientTestingModule],
+ providers: [{ provide: ConfigService, useValue: new ConfigService() }]
+ });
+
+ configService = TestBed.inject(ConfigService);
+ jobService = TestBed.inject(JobService);
+ httpTestingController = TestBed.inject(HttpTestingController);
+ });
+
+ it('#changeDesiredParallelism', done => {
+ const jobId = 'apache-flink';
+ const jobResourceRequirements: JobResourceRequirements = {
+ firstVertex: {
+ parallelism: {
+ lowerBound: 1,
+ upperBound: 1
+ }
+ },
+ secondVertex: {
+ parallelism: {
+ lowerBound: 1,
+ upperBound: 2
+ }
+ },
+ thirdVertex: {
+ parallelism: {
+ lowerBound: 1,
+ upperBound: 3
+ }
+ }
+ };
+ const desiredParallelism = new Map<string, number>();
+ desiredParallelism.set('firstVertex', 3);
+ desiredParallelism.set('secondVertex', 2);
+ desiredParallelism.set('thirdVertex', 1);
+ jobService.changeDesiredParallelism(jobId,
desiredParallelism).subscribe(() => {
+ expect(true).toBe(true);
+ done();
+ });
+
+ const firstRequest = httpTestingController.expectOne({
+ method: 'GET',
+ url: `${configService.BASE_URL}/jobs/${jobId}/resource-requirements`
+ });
+ firstRequest.flush(clone(jobResourceRequirements));
+
+ const expected = clone(jobResourceRequirements);
+ for (const k of desiredParallelism.keys()) {
+ const newUpperBound = desiredParallelism.get(k);
+ if (newUpperBound != undefined) {
+ expected[k].parallelism.upperBound = newUpperBound;
+ }
+ }
+
+ const secondRequest = httpTestingController.expectOne(request => {
+ let matches = true;
+ matches = matches && request.method == 'PUT';
+ matches = matches && request.url ==
`${configService.BASE_URL}/jobs/${jobId}/resource-requirements`;
+ if (matches) {
+ expect(request.body).toEqual(expected);
+ }
+ return true;
+ });
+ secondRequest.flush(expected);
+ });
+});
diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
index 21eb73bc3f4..ef8c221b323 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
@@ -18,7 +18,7 @@
import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
-import { EMPTY, forkJoin, Observable } from 'rxjs';
+import { EMPTY, forkJoin, mergeMap, Observable } from 'rxjs';
import { catchError, map } from 'rxjs/operators';
import {
@@ -44,6 +44,7 @@ import {
VerticesLink,
JobVertexSubTaskDetail
} from '@flink-runtime-web/interfaces';
+import { JobResourceRequirements } from
'@flink-runtime-web/interfaces/job-resource-requirements';
import { ConfigService } from './config.service';
@@ -175,6 +176,35 @@ export class JobService {
);
}
+ public loadJobResourceRequirements(jobId: string):
Observable<JobResourceRequirements> {
+ return this.httpClient.get<JobResourceRequirements>(
+ `${this.configService.BASE_URL}/jobs/${jobId}/resource-requirements`
+ );
+ }
+
+ public changeDesiredParallelism(jobId: string, desiredParallelism:
Map<string, number>): Observable<void> {
+ return this.loadJobResourceRequirements(jobId)
+ .pipe(
+ map(jobResourceRequirements => {
+ for (const vertexId in jobResourceRequirements) {
+ const newUpperBound = desiredParallelism.get(vertexId);
+ if (newUpperBound != undefined) {
+ jobResourceRequirements[vertexId].parallelism.upperBound =
newUpperBound;
+ }
+ }
+ return jobResourceRequirements;
+ })
+ )
+ .pipe(
+ mergeMap(jobResourceRequirements => {
+ return this.httpClient.put<void>(
+
`${this.configService.BASE_URL}/jobs/${jobId}/resource-requirements`,
+ jobResourceRequirements
+ );
+ })
+ );
+ }
+
/** nodes to nodes links in order to generate graph */
private convertJob(job: JobDetail): JobDetailCorrect {
const links: VerticesLink[] = [];
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index c163d802008..9c8de50b21c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.handler;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.util.Preconditions;
@@ -40,13 +41,16 @@ public class RestHandlerConfiguration {
private final boolean webCancelEnabled;
+ private final boolean webRescaleEnabled;
+
public RestHandlerConfiguration(
long refreshInterval,
int maxCheckpointStatisticCacheEntries,
Time timeout,
File webUiDir,
boolean webSubmitEnabled,
- boolean webCancelEnabled) {
+ boolean webCancelEnabled,
+ boolean webRescaleEnabled) {
Preconditions.checkArgument(
refreshInterval > 0L, "The refresh interval (ms) should be
larger than 0.");
this.refreshInterval = refreshInterval;
@@ -57,6 +61,7 @@ public class RestHandlerConfiguration {
this.webUiDir = Preconditions.checkNotNull(webUiDir);
this.webSubmitEnabled = webSubmitEnabled;
this.webCancelEnabled = webCancelEnabled;
+ this.webRescaleEnabled = webRescaleEnabled;
}
public long getRefreshInterval() {
@@ -83,6 +88,10 @@ public class RestHandlerConfiguration {
return webCancelEnabled;
}
+ public boolean isWebRescaleEnabled() {
+ return webRescaleEnabled;
+ }
+
public static RestHandlerConfiguration fromConfiguration(Configuration
configuration) {
final long refreshInterval =
configuration.getLong(WebOptions.REFRESH_INTERVAL);
@@ -96,6 +105,11 @@ public class RestHandlerConfiguration {
final boolean webSubmitEnabled =
configuration.getBoolean(WebOptions.SUBMIT_ENABLE);
final boolean webCancelEnabled =
configuration.getBoolean(WebOptions.CANCEL_ENABLE);
+ final boolean webRescaleSupported =
+ ClusterOptions.isAdaptiveSchedulerEnabled(configuration)
+ &&
!ClusterOptions.isReactiveModeEnabled(configuration);
+ final boolean webRescaleEnabled =
+ webRescaleSupported &&
configuration.getBoolean(WebOptions.RESCALE_ENABLE);
return new RestHandlerConfiguration(
refreshInterval,
@@ -103,6 +117,7 @@ public class RestHandlerConfiguration {
timeout,
webUiDir,
webSubmitEnabled,
- webCancelEnabled);
+ webCancelEnabled,
+ webRescaleEnabled);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
index caae3c14f92..d456ea07811 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
@@ -49,7 +49,8 @@ public class DashboardConfigHandler
messageHeaders,
long refreshInterval,
boolean webSubmitEnabled,
- boolean webCancelEnabled) {
+ boolean webCancelEnabled,
+ boolean webRescaleEnabled) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);
dashboardConfiguration =
@@ -58,6 +59,7 @@ public class DashboardConfigHandler
ZonedDateTime.now(),
webSubmitEnabled,
webCancelEnabled,
+ webRescaleEnabled,
false);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
index b3cfa4fb47d..6fa883d66a4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
@@ -48,6 +48,8 @@ public class DashboardConfiguration implements ResponseBody {
public static final String FIELD_NAME_FEATURE_WEB_CANCEL = "web-cancel";
+ public static final String FIELD_NAME_FEATURE_WEB_RESCALE = "web-rescale";
+
public static final String FIELD_NAME_FEATURE_WEB_HISTORY = "web-history";
@JsonProperty(FIELD_NAME_REFRESH_INTERVAL)
@@ -123,6 +125,9 @@ public class DashboardConfiguration implements ResponseBody
{
@JsonProperty(FIELD_NAME_FEATURE_WEB_CANCEL)
private final boolean webCancelEnabled;
+ @JsonProperty(FIELD_NAME_FEATURE_WEB_RESCALE)
+ private final boolean webRescaleEnabled;
+
@JsonProperty(FIELD_NAME_FEATURE_WEB_HISTORY)
private final boolean isHistoryServer;
@@ -130,9 +135,11 @@ public class DashboardConfiguration implements
ResponseBody {
public Features(
@JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT) boolean
webSubmitEnabled,
@JsonProperty(FIELD_NAME_FEATURE_WEB_CANCEL) boolean
webCancelEnabled,
+ @JsonProperty(FIELD_NAME_FEATURE_WEB_RESCALE) boolean
webRescaleEnabled,
@JsonProperty(FIELD_NAME_FEATURE_WEB_HISTORY) boolean
isHistoryServer) {
this.webSubmitEnabled = webSubmitEnabled;
this.webCancelEnabled = webCancelEnabled;
+ this.webRescaleEnabled = webRescaleEnabled;
this.isHistoryServer = isHistoryServer;
}
@@ -146,6 +153,11 @@ public class DashboardConfiguration implements
ResponseBody {
return webCancelEnabled;
}
+ @JsonIgnore
+ public boolean isWebRescaleEnabled() {
+ return webRescaleEnabled;
+ }
+
@JsonIgnore
public boolean isHistoryServer() {
return isHistoryServer;
@@ -204,6 +216,7 @@ public class DashboardConfiguration implements ResponseBody
{
ZonedDateTime zonedDateTime,
boolean webSubmitEnabled,
boolean webCancelEnabled,
+ boolean webRescaleEnabled,
boolean isHistoryServer) {
final String flinkVersion = EnvironmentInformation.getVersion();
@@ -226,6 +239,7 @@ public class DashboardConfiguration implements ResponseBody
{
zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds()
* 1000,
flinkVersion,
flinkRevision,
- new Features(webSubmitEnabled, webCancelEnabled,
isHistoryServer));
+ new Features(
+ webSubmitEnabled, webCancelEnabled, webRescaleEnabled,
isHistoryServer));
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 7836d720237..5193e08d85e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -298,7 +298,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
DashboardConfigurationHeaders.getInstance(),
restConfiguration.getRefreshInterval(),
hasWebSubmissionHandlers,
- restConfiguration.isWebCancelEnabled());
+ restConfiguration.isWebCancelEnabled(),
+ restConfiguration.isWebRescaleEnabled());
JobIdsHandler jobIdsHandler =
new JobIdsHandler(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
index 43126e5a0d2..67454e9b1ae 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
@@ -19,43 +19,78 @@
package org.apache.flink.runtime.rest.handler;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link RestHandlerConfiguration}. */
-public class RestHandlerConfigurationTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class RestHandlerConfigurationTest {
@Test
- public void testWebSubmitFeatureFlagEnabled() {
+ void testWebSubmitFeatureFlagEnabled() {
testWebSubmitFeatureFlag(true);
}
@Test
- public void testWebSubmitFeatureFlagDisabled() {
+ void testWebSubmitFeatureFlagDisabled() {
testWebSubmitFeatureFlag(false);
}
@Test
- public void testWebCancelFeatureFlagEnabled() {
+ void testWebCancelFeatureFlagEnabled() {
testWebCancelFeatureFlag(true);
}
@Test
- public void testWebCancelFeatureFlagDisabled() {
+ void testWebCancelFeatureFlagDisabled() {
testWebCancelFeatureFlag(false);
}
+ @ParameterizedTest
+ @CsvSource({
+ "true,true,true,false",
+ "true,true,false,true",
+ "true,false,true,false",
+ "true,false,false,false",
+ "false,true,true,false",
+ "false,true,false,false",
+ "false,false,true,false",
+ "false,false,false,false",
+ })
+ void testWebRescaleFeatureFlagWithReactiveMode(
+ boolean webRescaleEnabled,
+ boolean adaptiveScheduler,
+ boolean reactiveMode,
+ boolean expectedResult) {
+ final Configuration config = new Configuration();
+ config.setBoolean(WebOptions.RESCALE_ENABLE, webRescaleEnabled);
+ if (adaptiveScheduler) {
+ config.set(JobManagerOptions.SCHEDULER,
JobManagerOptions.SchedulerType.Adaptive);
+ }
+ if (reactiveMode) {
+ config.set(JobManagerOptions.SCHEDULER_MODE,
SchedulerExecutionMode.REACTIVE);
+ }
+ RestHandlerConfiguration restHandlerConfiguration =
+ RestHandlerConfiguration.fromConfiguration(config);
+
assertThat(restHandlerConfiguration.isWebRescaleEnabled()).isEqualTo(expectedResult);
+ }
+
private static void testWebSubmitFeatureFlag(boolean webSubmitEnabled) {
final Configuration config = new Configuration();
config.setBoolean(WebOptions.SUBMIT_ENABLE, webSubmitEnabled);
RestHandlerConfiguration restHandlerConfiguration =
RestHandlerConfiguration.fromConfiguration(config);
- assertEquals(webSubmitEnabled,
restHandlerConfiguration.isWebSubmitEnabled());
+
assertThat(restHandlerConfiguration.isWebSubmitEnabled()).isEqualTo(webSubmitEnabled);
}
private static void testWebCancelFeatureFlag(boolean webCancelEnabled) {
@@ -64,6 +99,6 @@ public class RestHandlerConfigurationTest extends TestLogger {
RestHandlerConfiguration restHandlerConfiguration =
RestHandlerConfiguration.fromConfiguration(config);
- assertEquals(webCancelEnabled,
restHandlerConfiguration.isWebCancelEnabled());
+
assertThat(restHandlerConfiguration.isWebCancelEnabled()).isEqualTo(webCancelEnabled);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
index aec97d61c72..eeedeb5b749 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
@@ -35,6 +35,6 @@ public class DashboardConfigurationTest
42,
"version",
"revision",
- new DashboardConfiguration.Features(true, true, false));
+ new DashboardConfiguration.Features(true, true, true, false));
}
}