This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 080efb9c410 [FLINK-34025] Add data skew metric endpoint and UI
080efb9c410 is described below
commit 080efb9c410102a5d12d31bb2af5a3faa3391736
Author: Emre Kartoglu <[email protected]>
AuthorDate: Thu Apr 4 12:02:21 2024 +0100
[FLINK-34025] Add data skew metric endpoint and UI
---
.../dagre/components/node/node.component.html | 3 +
.../dagre/components/node/node.component.ts | 4 +-
.../web-dashboard/src/app/interfaces/job-detail.ts | 1 +
.../src/app/interfaces/job-metrics.ts | 12 +++
.../pages/job/dataskew/data-skew.component.html | 68 +++++++++++++
.../job/dataskew/data-skew.component.less} | 25 +++--
.../app/pages/job/dataskew/data-skew.component.ts | 106 +++++++++++++++++++++
.../web-dashboard/src/app/pages/job/job.config.ts | 1 +
.../app/pages/job/modules/completed-job/routes.ts | 8 ++
.../app/pages/job/modules/running-job/routes.ts | 8 ++
.../pages/job/overview/job-overview.component.ts | 15 ++-
.../src/app/services/metrics.service.ts | 82 +++++++++++-----
.../metrics/AbstractAggregatingMetricsHandler.java | 30 +++++-
.../handler/job/metrics/DoubleAccumulator.java | 69 ++++++++++++++
.../messages/job/metrics/AggregatedMetric.java | 20 +++-
.../job/metrics/MetricsAggregationParameter.java | 3 +-
.../metrics/AggregatingMetricsHandlerTestBase.java | 34 +++++++
.../handler/job/metrics/DoubleAccumulatorTest.java | 72 ++++++++++++++
18 files changed, 509 insertions(+), 52 deletions(-)
diff --git
a/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.html
b/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.html
index 6add6d60ea6..1cf2b255922 100644
---
a/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.html
+++
b/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.html
@@ -41,6 +41,9 @@
<xhtml:div class="node-label metric" title="Maximum busy percentage
across all subtasks">
Busy (max): {{ prettyPrint(busyPercentage) }}
</xhtml:div>
+ <xhtml:div class="node-label metric" title="Data skew percentage
across all subtasks">
+ Data Skew: {{ prettyPrint(dataSkewPercentage) }}
+ </xhtml:div>
<xhtml:div class="node-label metric" *ngIf="lowWatermark">
Low Watermark: {{ lowWatermark }}
</xhtml:div>
diff --git
a/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.ts
b/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.ts
index 368a7053603..cf94c2e87e2 100644
---
a/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.ts
+++
b/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.ts
@@ -37,6 +37,7 @@ export class NodeComponent {
lowWatermark: number | null | undefined;
backPressuredPercentage: number | undefined = NaN;
busyPercentage: number | undefined = NaN;
+ dataSkewPercentage: number | undefined = NaN;
backgroundColor: string | undefined;
borderColor: string | undefined;
height = 0;
@@ -70,6 +71,7 @@ export class NodeComponent {
if (this.isValid(value.busyPercentage)) {
this.busyPercentage = value.busyPercentage;
}
+ this.dataSkewPercentage = value.dataSkewPercentage;
this.height = value.height || 0;
this.id = value.id;
if (description && description.length > 300) {
@@ -165,7 +167,7 @@ export class NodeComponent {
if (value === undefined || isNaN(value)) {
return 'N/A';
} else {
- return `${value}%`;
+ return `${Math.round(value)}%`;
}
}
}
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
index bd4b93aef18..21f5294b686 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
@@ -136,6 +136,7 @@ export interface NodesItemCorrect extends NodesItem {
lowWatermark?: number;
backPressuredPercentage?: number;
busyPercentage?: number;
+ dataSkewPercentage?: number;
}
export interface NodesItemLink {
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
index 1e425ad6c9d..b9d8e9728b2 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
@@ -25,6 +25,18 @@ export interface MetricMap {
[p: string]: number;
}
+export interface AggregateValueMap {
+ min: number;
+ max: number;
+ avg: number;
+ sum: number;
+ skew: number;
+}
+
+export interface MetricMapWithAllAggregates {
+ [metricName: string]: AggregateValueMap;
+}
+
export interface MetricMapWithTimestamp {
timestamp: number;
values: MetricMap;
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.html
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.html
new file mode 100644
index 00000000000..3937e261cd3
--- /dev/null
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.html
@@ -0,0 +1,68 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<nz-card nzType="inner" nzTitle="What is Data Skew?">
+ <p>
+ Your Flink job has data skew when a subset of subtasks in any of the
operators receives a
+ disproportionate number of records, potentially overloading a subset of
task managers while the
+ rest remain idle, leading to inefficient processing and potentially
backpressure and other
+ related problems.
+ </p>
+</nz-card>
+
+<nz-card nzType="inner" nzTitle="Data Skew" [nzLoading]="isLoading"
[nzExtra]="extraTemplate">
+ <nz-table
+ class="no-border small"
+ [nzSize]="'small'"
+ [nzData]="listOfVerticesAndSkew"
+ [nzFrontPagination]="false"
+ [nzShowPagination]="false"
+ >
+ <tbody>
+ <tr>
+ <th>Vertex</th>
+ <th>Data Skew Percentage</th>
+ </tr>
+ <ng-container *ngFor="let vertexSkew of listOfVerticesAndSkew">
+ <tr>
+ <td>{{ vertexSkew.vertexName }}</td>
+ <td>{{ vertexSkew.skewPct }}%</td>
+ </tr>
+ </ng-container>
+ </tbody>
+ </nz-table>
+</nz-card>
+
+<nz-card nzType="inner" nzTitle="How is Data Skew Calculated?">
+ <p>
+ Data skew is calculated using the Coefficient of Variation (CV) statistic.
The data skew
+ percentage shown here is calculated by using the
+ <i>numRecordsIn</i>
+ metric across the subtasks of your operators. The data skew percentage
under the Overview tab
+ shows a live data skew percentage by using the
+ <i>numRecordsInPerSecond</i>
+ metric and therefore can be different from the skew percentage shown here.
+ </p>
+</nz-card>
+
+<ng-template #extraTemplate>
+ <button nz-button nzType="primary" class="refresh" nzSize="small"
(click)="refresh()">
+ <i nz-icon nzType="sync"></i>
+ Refresh
+ </button>
+</ng-template>
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.less
similarity index 74%
copy from flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
copy to
flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.less
index 1e425ad6c9d..303f0d64ca2 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.less
@@ -16,21 +16,18 @@
* limitations under the License.
*/
-export interface JobMetric {
- id: string;
- value: string;
-}
-
-export interface MetricMap {
- [p: string]: number;
-}
+:host {
+ ::ng-deep {
+ .nz-disable-td {
+ width: 100%;
+ }
-export interface MetricMapWithTimestamp {
- timestamp: number;
- values: MetricMap;
+ .ant-table-cell {
+ white-space: pre-wrap;
+ }
+ }
}
-export interface Watermarks {
- lowWatermark: number;
- watermarks: MetricMap;
+nz-card {
+ margin: 24px;
}
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.ts
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.ts
new file mode 100644
index 00000000000..2e958f1e160
--- /dev/null
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.ts
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { NgForOf } from '@angular/common';
+import { ChangeDetectionStrategy, Component, OnDestroy, OnInit,
ChangeDetectorRef } from '@angular/core';
+import { Subject } from 'rxjs';
+import { distinctUntilChanged, takeUntil, map, mergeMap } from
'rxjs/operators';
+
+import { JobDetailCorrect } from '@flink-runtime-web/interfaces';
+import { MetricsService } from '@flink-runtime-web/services';
+import { NzButtonModule } from 'ng-zorro-antd/button';
+import { NzCardModule } from 'ng-zorro-antd/card';
+import { NzIconModule } from 'ng-zorro-antd/icon';
+import { NzTableModule } from 'ng-zorro-antd/table';
+
+import { JobLocalService } from '../job-local.service';
+
+@Component({
+ selector: 'flink-data-skew',
+ templateUrl: './data-skew.component.html',
+ styleUrls: ['./data-skew.component.less'],
+ changeDetection: ChangeDetectionStrategy.OnPush,
+ imports: [NzButtonModule, NzCardModule, NzTableModule, NgForOf,
NzIconModule],
+ standalone: true
+})
+export class DataSkewComponent implements OnInit, OnDestroy {
+ public listOfVerticesAndSkew: Array<{ vertexName: string; skewPct: number }>
= [];
+ public isLoading = true;
+ public jobDetail: JobDetailCorrect;
+
+ private destroy$ = new Subject<void>();
+ private refresh$ = new Subject<void>();
+
+ constructor(
+ private readonly metricsService: MetricsService,
+ private readonly jobLocalService: JobLocalService,
+ private readonly cdr: ChangeDetectorRef
+ ) {}
+
+ public ngOnInit(): void {
+ this.refresh$
+ .pipe(
+ map(() => {
+ return this.jobDetail;
+ }),
+ takeUntil(this.destroy$)
+ )
+ .pipe(map(jobDetail => jobDetail.vertices))
+ .pipe(
+ mergeMap(vertices => {
+ const result: Array<{ vertexName: string; skewPct: number }> = [];
+ vertices.forEach(v => {
+ this.metricsService
+ .loadAggregatedMetrics(this.jobDetail.jid, v.id,
['numRecordsIn'], 'skew')
+ .subscribe(metricMap => {
+ const skew = Number.isNaN(+metricMap['numRecordsIn']) ? 0 :
Math.round(metricMap['numRecordsIn']);
+ result.push({ vertexName: v.name, skewPct: skew });
+ result.sort((a, b) => (a.skewPct > b.skewPct ? -1 : 1));
+ this.isLoading = false;
+ this.listOfVerticesAndSkew = result;
+ this.cdr.markForCheck();
+ });
+ });
+ return result;
+ })
+ )
+ .subscribe(_ => {}); // no-op subscriber to trigger the execution of the
lazy processing
+
+ this.jobLocalService
+ .jobDetailChanges()
+ .pipe(
+ distinctUntilChanged((pre, next) => pre.jid === next.jid),
+ takeUntil(this.destroy$)
+ )
+ .subscribe(data => {
+ this.jobDetail = data;
+ this.cdr.markForCheck();
+ this.refresh$.next();
+ });
+ }
+
+ public ngOnDestroy(): void {
+ this.destroy$.next();
+ this.destroy$.complete();
+ this.refresh$.complete();
+ }
+
+ public refresh(): void {
+ this.refresh$.next();
+ }
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts
b/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts
index 0c233f88923..59ef1027518 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts
@@ -26,6 +26,7 @@ export const JOB_MODULE_DEFAULT_CONFIG:
Required<JobModuleConfig> = {
routerTabs: [
{ title: 'Overview', path: 'overview' },
{ title: 'Exceptions', path: 'exceptions' },
+ { title: 'Data Skew', path: 'dataskew' },
{ title: 'TimeLine', path: 'timeline' },
{ title: 'Checkpoints', path: 'checkpoints' },
{ title: 'Configuration', path: 'configuration' }
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts
b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts
index dedefa4233a..496acbaf2e2 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts
@@ -101,6 +101,14 @@ export const COMPLETED_JOB_ROUES: Routes = [
path: 'exceptions'
}
},
+ {
+ path: 'dataskew',
+ loadComponent: () =>
+
import('@flink-runtime-web/pages/job/dataskew/data-skew.component').then(m =>
m.DataSkewComponent),
+ data: {
+ path: 'dataskew'
+ }
+ },
{
path: 'checkpoints',
loadComponent: () =>
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts
b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts
index 63f84c7232b..c09afabf9ab 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts
@@ -52,6 +52,14 @@ export const RUNNING_JOB_ROUTES: Routes = [
path: 'exceptions'
}
},
+ {
+ path: 'dataskew',
+ loadComponent: () =>
+
import('@flink-runtime-web/pages/job/dataskew/data-skew.component').then(m =>
m.DataSkewComponent),
+ data: {
+ path: 'dataskew'
+ }
+ },
{
path: 'checkpoints',
loadComponent: () =>
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
index 10f7c034b40..239b56d7fb2 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
@@ -137,7 +137,7 @@ export class JobOverviewComponent implements OnInit,
OnDestroy {
}
public refreshNodesWithMetrics(): void {
- this.mergeWithBackPressure(this.nodes)
+ this.mergeWithBackPressureAndSkew(this.nodes)
.pipe(
mergeMap(nodes => this.mergeWithWatermarks(nodes)),
takeUntil(this.destroy$)
@@ -150,17 +150,22 @@ export class JobOverviewComponent implements OnInit,
OnDestroy {
});
}
- private mergeWithBackPressure(nodes: NodesItemCorrect[]):
Observable<NodesItemCorrect[]> {
+ private mergeWithBackPressureAndSkew(nodes: NodesItemCorrect[]):
Observable<NodesItemCorrect[]> {
return forkJoin(
nodes.map(node => {
return this.metricService
- .loadAggregatedMetrics(this.jobId, node.id,
['backPressuredTimeMsPerSecond', 'busyTimeMsPerSecond'])
+ .loadMetricsWithAllAggregates(this.jobId, node.id, [
+ 'backPressuredTimeMsPerSecond',
+ 'busyTimeMsPerSecond',
+ 'numRecordsInPerSecond'
+ ])
.pipe(
map(result => {
return {
...node,
- backPressuredPercentage:
Math.min(Math.round(result.backPressuredTimeMsPerSecond / 10), 100),
- busyPercentage: Math.min(Math.round(result.busyTimeMsPerSecond
/ 10), 100)
+ backPressuredPercentage:
Math.min(Math.round(result.backPressuredTimeMsPerSecond.max / 10), 100),
+ busyPercentage:
Math.min(Math.round(result.busyTimeMsPerSecond.max / 10), 100),
+ dataSkewPercentage: result.numRecordsInPerSecond.skew
};
})
);
diff --git
a/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
b/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
index e6e475b1517..cddefea45f3 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
@@ -21,7 +21,13 @@ import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
-import { MetricMap, JobMetric, Watermarks, MetricMapWithTimestamp } from
'@flink-runtime-web/interfaces';
+import {
+ MetricMap,
+ MetricMapWithAllAggregates,
+ JobMetric,
+ Watermarks,
+ MetricMapWithTimestamp
+} from '@flink-runtime-web/interfaces';
import { ConfigService } from './config.service';
@@ -71,45 +77,73 @@ export class MetricsService {
);
}
- /** Get aggregated metric data from all subtasks of the given vertexId. */
- public loadAggregatedMetrics(
+ /** Get aggregated metric data from all subtasks of the given vertexId.
Example output:
+ { "numRecordsIn": { "min": 0.0, "max": 10.0, "sum": 15.0, "avg": 5.0,
"skew": 66.0 } } */
+ public loadMetricsWithAllAggregates(
jobId: string,
vertexId: string,
- listOfMetricName: string[],
- aggregate: string = 'max'
- ): Observable<MetricMap> {
+ listOfMetricName: string[]
+ ): Observable<MetricMapWithAllAggregates> {
const metricName = listOfMetricName.join(',');
return this.httpClient
- .get<Array<{ id: string; min: number; max: number; avg: number; sum:
number }>>(
+ .get<Array<{ id: string; min: number; max: number; avg: number; sum:
number; skew: number }>>(
`${this.configService.BASE_URL}/jobs/${jobId}/vertices/${vertexId}/subtasks/metrics`,
{ params: { get: metricName } }
)
.pipe(
map(arr => {
- const result: MetricMap = {};
+ const result: MetricMapWithAllAggregates = {};
arr.forEach(item => {
- switch (aggregate) {
- case 'min':
- result[item.id] = +item.min;
- break;
- case 'max':
- result[item.id] = +item.max;
- break;
- case 'avg':
- result[item.id] = +item.avg;
- break;
- case 'sum':
- result[item.id] = +item.sum;
- break;
- default:
- throw new Error(`Unsupported aggregate: ${aggregate}`);
- }
+ result[item.id] = { min: NaN, max: NaN, avg: NaN, sum: NaN, skew:
NaN };
+ result[item.id].min = +item.min;
+ result[item.id].max = +item.max;
+ result[item.id].avg = +item.avg;
+ result[item.id].sum = +item.sum;
+ result[item.id].skew = +item.skew;
});
return result;
})
);
}
+ /** Get metric data from all subtasks of the given vertexId, aggregated by a
given aggregation type
+ Default aggregation type: max */
+ public loadAggregatedMetrics(
+ jobId: string,
+ vertexId: string,
+ listOfMetricName: string[],
+ aggregate: string = 'max'
+ ): Observable<MetricMap> {
+ const result: MetricMap = {};
+ return this.loadMetricsWithAllAggregates(jobId, vertexId,
listOfMetricName).pipe(
+ map((metricMapWithAllAggregates: MetricMapWithAllAggregates) => {
+ for (const metricName in metricMapWithAllAggregates) {
+ const value = metricMapWithAllAggregates[metricName];
+ switch (aggregate) {
+ case 'min':
+ result[metricName] = +value.min;
+ break;
+ case 'max':
+ result[metricName] = +value.max;
+ break;
+ case 'avg':
+ result[metricName] = +value.avg;
+ break;
+ case 'sum':
+ result[metricName] = +value.sum;
+ break;
+ case 'skew':
+ result[metricName] = +value.skew;
+ break;
+ default:
+ throw new Error(`Unsupported aggregate: ${aggregate}`);
+ }
+ }
+ return result;
+ })
+ );
+ }
+
public loadWatermarks(jobId: string, vertexId: string):
Observable<Watermarks> {
return this.httpClient
.get<JobMetric[]>(`${this.configService.BASE_URL}/jobs/${jobId}/vertices/${vertexId}/watermarks`)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
index 7c3eab054bf..038c0282096 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -124,12 +124,14 @@ public abstract class AbstractAggregatingMetricsHandler<
DoubleAccumulator.DoubleMaximumFactory maximumFactory
= null;
DoubleAccumulator.DoubleAverageFactory averageFactory
= null;
DoubleAccumulator.DoubleSumFactory sumFactory = null;
+ DoubleAccumulator.DoubleDataSkewFactory skewFactory =
null;
// by default we return all aggregations
if (requestedAggregations.isEmpty()) {
minimumFactory =
DoubleAccumulator.DoubleMinimumFactory.get();
maximumFactory =
DoubleAccumulator.DoubleMaximumFactory.get();
averageFactory =
DoubleAccumulator.DoubleAverageFactory.get();
sumFactory =
DoubleAccumulator.DoubleSumFactory.get();
+ skewFactory =
DoubleAccumulator.DoubleDataSkewFactory.get();
} else {
for (MetricsAggregationParameter.AggregationMode
aggregation :
requestedAggregations) {
@@ -149,6 +151,9 @@ public abstract class AbstractAggregatingMetricsHandler<
case SUM:
sumFactory =
DoubleAccumulator.DoubleSumFactory.get();
break;
+ case SKEW:
+ skewFactory =
DoubleAccumulator.DoubleDataSkewFactory.get();
+ break;
default:
log.warn(
"Unsupported aggregation
specified: {}",
@@ -158,7 +163,11 @@ public abstract class AbstractAggregatingMetricsHandler<
}
MetricAccumulatorFactory metricAccumulatorFactory =
new MetricAccumulatorFactory(
- minimumFactory, maximumFactory,
averageFactory, sumFactory);
+ minimumFactory,
+ maximumFactory,
+ averageFactory,
+ sumFactory,
+ skewFactory);
return getAggregatedMetricValues(
stores, requestedMetrics,
metricAccumulatorFactory);
@@ -245,16 +254,19 @@ public abstract class AbstractAggregatingMetricsHandler<
@Nullable private final DoubleAccumulator.DoubleAverageFactory
averageFactory;
@Nullable private final DoubleAccumulator.DoubleSumFactory sumFactory;
+ @Nullable private final DoubleAccumulator.DoubleDataSkewFactory
dataSkewFactory;
private MetricAccumulatorFactory(
@Nullable DoubleAccumulator.DoubleMinimumFactory
minimumFactory,
@Nullable DoubleAccumulator.DoubleMaximumFactory
maximumFactory,
@Nullable DoubleAccumulator.DoubleAverageFactory
averageFactory,
- @Nullable DoubleAccumulator.DoubleSumFactory sumFactory) {
+ @Nullable DoubleAccumulator.DoubleSumFactory sumFactory,
+ @Nullable DoubleAccumulator.DoubleDataSkewFactory
dataSkewFactory) {
this.minimumFactory = minimumFactory;
this.maximumFactory = maximumFactory;
this.averageFactory = averageFactory;
this.sumFactory = sumFactory;
+ this.dataSkewFactory = dataSkewFactory;
}
MetricAccumulator get(String metricName, double init) {
@@ -263,7 +275,8 @@ public abstract class AbstractAggregatingMetricsHandler<
minimumFactory == null ? null : minimumFactory.get(init),
maximumFactory == null ? null : maximumFactory.get(init),
averageFactory == null ? null : averageFactory.get(init),
- sumFactory == null ? null : sumFactory.get(init));
+ sumFactory == null ? null : sumFactory.get(init),
+ dataSkewFactory == null ? null :
dataSkewFactory.get(init));
}
}
@@ -274,18 +287,21 @@ public abstract class AbstractAggregatingMetricsHandler<
@Nullable private final DoubleAccumulator max;
@Nullable private final DoubleAccumulator avg;
@Nullable private final DoubleAccumulator sum;
+ @Nullable private final DoubleAccumulator skew;
private MetricAccumulator(
String metricName,
@Nullable DoubleAccumulator min,
@Nullable DoubleAccumulator max,
@Nullable DoubleAccumulator avg,
- @Nullable DoubleAccumulator sum) {
+ @Nullable DoubleAccumulator sum,
+ @Nullable DoubleAccumulator.DoubleDataSkew skew) {
this.metricName = Preconditions.checkNotNull(metricName);
this.min = min;
this.max = max;
this.avg = avg;
this.sum = sum;
+ this.skew = skew;
}
void add(double value) {
@@ -301,6 +317,9 @@ public abstract class AbstractAggregatingMetricsHandler<
if (sum != null) {
sum.add(value);
}
+ if (skew != null) {
+ skew.add(value);
+ }
}
AggregatedMetric get() {
@@ -309,7 +328,8 @@ public abstract class AbstractAggregatingMetricsHandler<
min == null ? null : min.getValue(),
max == null ? null : max.getValue(),
avg == null ? null : avg.getValue(),
- sum == null ? null : sum.getValue());
+ sum == null ? null : sum.getValue(),
+ skew == null ? null : skew.getValue());
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
index 4fc19a72cf9..7dd9ef1e386 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
@@ -18,6 +18,11 @@
package org.apache.flink.runtime.rest.handler.job.metrics;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.List;
+
/** An interface for accumulating double values. */
interface DoubleAccumulator {
@@ -123,6 +128,22 @@ interface DoubleAccumulator {
}
}
+ /** Factory for {@link DoubleDataSkew}. */
+ final class DoubleDataSkewFactory implements
DoubleAccumulatorFactory<DoubleDataSkew> {
+ private static final DoubleDataSkewFactory INSTANCE = new
DoubleDataSkewFactory();
+
+ private DoubleDataSkewFactory() {}
+
+ @Override
+ public DoubleDataSkew get(double init) {
+ return new DoubleDataSkew(init);
+ }
+
+ public static DoubleDataSkewFactory get() {
+ return INSTANCE;
+ }
+ }
+
/** {@link DoubleAccumulator} that returns the maximum value. */
final class DoubleMaximum implements DoubleAccumulator {
@@ -233,4 +254,52 @@ interface DoubleAccumulator {
return NAME;
}
}
+
+ /**
+ * {@link DoubleAccumulator} that returns the skew percentage over all
values. Uses a version of
+ * the Coefficient of Variation (CV) statistic to calculate skew. This
version of CV uses
+ * average absolute deviation, instead of std deviation. This method
currently assumes a dataset
+ * of positive numbers and 0.
+ */
+ final class DoubleDataSkew implements DoubleAccumulator {
+
+ public static final String NAME = "skew";
+
+ private final List<Double> values = new ArrayList<>();
+
+ @VisibleForTesting
+ DoubleDataSkew() {}
+
+ private DoubleDataSkew(double init) {
+ values.add(init);
+ }
+
+ @Override
+ public void add(double value) {
+ values.add(value);
+ }
+
+ @Override
+ public double getValue() {
+ if (values.isEmpty()) {
+ return 0.0;
+ }
+ double sum = values.stream().reduce(Double::sum).orElse(0.0);
+ double avg = sum / values.size();
+ if (avg == 0.0) {
+ // Avoid division by zero in below calculations
+ // This also makes sense because avg of 0 implies no data skew
+ return 0.0;
+ }
+ double totalAbsDev =
+ values.stream().map(v -> Math.abs(avg -
v)).reduce(Double::sum).orElse(0.0);
+ double avgDev = totalAbsDev / values.size();
+ return Math.min((avgDev / avg) * 100.0, 100.0);
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
index e0e2b974065..bc72b8187a3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
@@ -43,6 +43,8 @@ public class AggregatedMetric {
private static final String FIELD_NAME_SUM = "sum";
+ private static final String FIELD_NAME_SKEW = "skew";
+
@JsonProperty(value = FIELD_NAME_ID, required = true)
private final String id;
@@ -62,23 +64,29 @@ public class AggregatedMetric {
@JsonProperty(FIELD_NAME_SUM)
private final Double sum;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_NAME_SKEW)
+ private final Double skew;
+
@JsonCreator
public AggregatedMetric(
final @JsonProperty(value = FIELD_NAME_ID, required = true) String
id,
final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
- final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum) {
+ final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum,
+ final @Nullable @JsonProperty(FIELD_NAME_SKEW) Double skew) {
this.id = requireNonNull(id, "id must not be null");
this.min = min;
this.max = max;
this.avg = avg;
this.sum = sum;
+ this.skew = skew;
}
public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID,
required = true) String id) {
- this(id, null, null, null, null);
+ this(id, null, null, null, null, null);
}
@JsonIgnore
@@ -106,6 +114,11 @@ public class AggregatedMetric {
return avg;
}
+ @JsonIgnore
+ public Double getSkew() {
+ return skew;
+ }
+
@Override
public String toString() {
return "AggregatedMetric{"
@@ -124,6 +137,9 @@ public class AggregatedMetric {
+ ", sum='"
+ sum
+ '\''
+ + ", skew='"
+ + skew
+ + '\''
+ '}';
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
index 143c0d6203d..2f1cfcd867a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
@@ -59,6 +59,7 @@ public class MetricsAggregationParameter
MIN,
MAX,
SUM,
- AVG
+ AVG,
+ SKEW,
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
index 365517f8447..9c31860c205 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -384,5 +384,39 @@ abstract class AggregatingMetricsHandlerTestBase<
assertThat(aggregatedMetric.getMax()).isCloseTo(3.0, within(0.1));
assertThat(aggregatedMetric.getAvg()).isCloseTo(2.0, within(0.1));
assertThat(aggregatedMetric.getSum()).isCloseTo(4.0, within(0.1));
+ assertThat(aggregatedMetric.getSkew()).isCloseTo(50.0, within(0.1));
+ }
+
+ @Test
+ void testDataSkewAggregation() throws Exception {
+ Map<String, List<String>> queryParams = new HashMap<>(4);
+ queryParams.put("get", Collections.singletonList("abc.metric1"));
+ queryParams.put("agg", Collections.singletonList("skew"));
+
+ HandlerRequest<EmptyRequestBody> request =
+ HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+
handler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ queryParams,
+ Collections.emptyList());
+
+ AggregatedMetricsResponseBody response =
+ handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY).get();
+
+ Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+ assertThat(aggregatedMetrics).hasSize(1);
+ AggregatedMetric aggregatedMetric =
aggregatedMetrics.iterator().next();
+
+ assertThat(aggregatedMetric.getId()).isEqualTo("abc.metric1");
+ // abc.metric1 has the data points: [1,3], avg=2
+ // mean absolute deviation = (abs(1-2) + abs(3-2))/2 = 1
+ // data skew = mean_abs_deviation/avg * 100 = 50
+ assertThat(aggregatedMetric.getSkew()).isCloseTo(50.0, within(0.1));
+ assertThat(aggregatedMetric.getMin()).isNull();
+ assertThat(aggregatedMetric.getAvg()).isNull();
+ assertThat(aggregatedMetric.getMax()).isNull();
+ assertThat(aggregatedMetric.getSum()).isNull();
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulatorTest.java
new file mode 100644
index 00000000000..cdb9b2742e1
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulatorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.junit.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.within;
+
+public class DoubleAccumulatorTest {
+
+ @ParameterizedTest
+ @MethodSource("dataSkewTests")
+ public void testDataSkew(double n1, double n2, double n3, double
expectedSkew) {
+ DoubleAccumulator.DoubleDataSkew dataSkew =
+ DoubleAccumulator.DoubleDataSkewFactory.get().get(n1);
+ dataSkew.add(n2);
+ dataSkew.add(n3);
+ assertThat(dataSkew.getValue()).isCloseTo(expectedSkew, within(0.5));
+ }
+
+ @Test
+ public void testDataSkewOnEmptyList() {
+ DoubleAccumulator.DoubleDataSkew dataSkew = new
DoubleAccumulator.DoubleDataSkew();
+ assertThat(dataSkew.getValue()).isEqualTo(0.0);
+ }
+
+ @Test
+ public void testDataSkewOnSingleValueList() {
+ DoubleAccumulator.DoubleDataSkew dataSkew =
+ DoubleAccumulator.DoubleDataSkewFactory.get().get(123);
+ assertThat(dataSkew.getValue()).isEqualTo(0.0);
+ }
+
+ private static Stream<Arguments> dataSkewTests() {
+ // Data set, followed by the expected data skew percentage
+ return Stream.of(
+ // Avg: (23 + 3 + 10) / 3 = 12
+ // Avg Absolute Deviation = ( (23 - 12) + (12 - 3) + ( 12 -
10) ) / 3 = 7.33
+ // Skew Percentage = 7.33/12 * 100 -> 61%
+ Arguments.of(23.0, 3.0, 10.0, 61.0),
+ // Avg: (300 + 0 + 0) / 3 = 100
+ // Avg Absolute Deviation = ( (300 - 100) + (100 - 0) + (100 -
0) ) / 3 = 133
+ // Skew Percentage = 133/100 * 100 -> 133% should be capped at
100
+ Arguments.of(300.0, 0.0, 0.0, 100.0),
+ // Test against any possible division by zero errors
+ Arguments.of(0.0, 0.0, 0.0, 0.0),
+ // Test low skew,
+ Arguments.of(50.0, 51.0, 52.0, 1.0));
+ }
+}