This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 080efb9c410 [FLINK-34025] Add data skew metric endpoint and UI
080efb9c410 is described below

commit 080efb9c410102a5d12d31bb2af5a3faa3391736
Author: Emre Kartoglu <[email protected]>
AuthorDate: Thu Apr 4 12:02:21 2024 +0100

    [FLINK-34025] Add data skew metric endpoint and UI
---
 .../dagre/components/node/node.component.html      |   3 +
 .../dagre/components/node/node.component.ts        |   4 +-
 .../web-dashboard/src/app/interfaces/job-detail.ts |   1 +
 .../src/app/interfaces/job-metrics.ts              |  12 +++
 .../pages/job/dataskew/data-skew.component.html    |  68 +++++++++++++
 .../job/dataskew/data-skew.component.less}         |  25 +++--
 .../app/pages/job/dataskew/data-skew.component.ts  | 106 +++++++++++++++++++++
 .../web-dashboard/src/app/pages/job/job.config.ts  |   1 +
 .../app/pages/job/modules/completed-job/routes.ts  |   8 ++
 .../app/pages/job/modules/running-job/routes.ts    |   8 ++
 .../pages/job/overview/job-overview.component.ts   |  15 ++-
 .../src/app/services/metrics.service.ts            |  82 +++++++++++-----
 .../metrics/AbstractAggregatingMetricsHandler.java |  30 +++++-
 .../handler/job/metrics/DoubleAccumulator.java     |  69 ++++++++++++++
 .../messages/job/metrics/AggregatedMetric.java     |  20 +++-
 .../job/metrics/MetricsAggregationParameter.java   |   3 +-
 .../metrics/AggregatingMetricsHandlerTestBase.java |  34 +++++++
 .../handler/job/metrics/DoubleAccumulatorTest.java |  72 ++++++++++++++
 18 files changed, 509 insertions(+), 52 deletions(-)

diff --git 
a/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.html
 
b/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.html
index 6add6d60ea6..1cf2b255922 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.html
@@ -41,6 +41,9 @@
         <xhtml:div class="node-label metric" title="Maximum busy percentage 
across all subtasks">
           Busy (max): {{ prettyPrint(busyPercentage) }}
         </xhtml:div>
+        <xhtml:div class="node-label metric" title="Data skew percentage 
across all subtasks">
+          Data Skew: {{ prettyPrint(dataSkewPercentage) }}
+        </xhtml:div>
         <xhtml:div class="node-label metric" *ngIf="lowWatermark">
           Low Watermark: {{ lowWatermark }}
         </xhtml:div>
diff --git 
a/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.ts
index 368a7053603..cf94c2e87e2 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/components/dagre/components/node/node.component.ts
@@ -37,6 +37,7 @@ export class NodeComponent {
   lowWatermark: number | null | undefined;
   backPressuredPercentage: number | undefined = NaN;
   busyPercentage: number | undefined = NaN;
+  dataSkewPercentage: number | undefined = NaN;
   backgroundColor: string | undefined;
   borderColor: string | undefined;
   height = 0;
@@ -70,6 +71,7 @@ export class NodeComponent {
     if (this.isValid(value.busyPercentage)) {
       this.busyPercentage = value.busyPercentage;
     }
+    this.dataSkewPercentage = value.dataSkewPercentage;
     this.height = value.height || 0;
     this.id = value.id;
     if (description && description.length > 300) {
@@ -165,7 +167,7 @@ export class NodeComponent {
     if (value === undefined || isNaN(value)) {
       return 'N/A';
     } else {
-      return `${value}%`;
+      return `${Math.round(value)}%`;
     }
   }
 }
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts 
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
index bd4b93aef18..21f5294b686 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
@@ -136,6 +136,7 @@ export interface NodesItemCorrect extends NodesItem {
   lowWatermark?: number;
   backPressuredPercentage?: number;
   busyPercentage?: number;
+  dataSkewPercentage?: number;
 }
 
 export interface NodesItemLink {
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts 
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
index 1e425ad6c9d..b9d8e9728b2 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
@@ -25,6 +25,18 @@ export interface MetricMap {
   [p: string]: number;
 }
 
+export interface AggregateValueMap {
+  min: number;
+  max: number;
+  avg: number;
+  sum: number;
+  skew: number;
+}
+
+export interface MetricMapWithAllAggregates {
+  [metricName: string]: AggregateValueMap;
+}
+
 export interface MetricMapWithTimestamp {
   timestamp: number;
   values: MetricMap;
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.html
new file mode 100644
index 00000000000..3937e261cd3
--- /dev/null
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.html
@@ -0,0 +1,68 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<nz-card nzType="inner" nzTitle="What is Data Skew?">
+  <p>
+    Your Flink job has data skew when a subset of subtasks in any of the 
operators receives a
+    disproportionate number of records, potentially overloading a subset of 
task managers while the
+    rest remain idle, leading to inefficient processing and potentially 
backpressure and other
+    related problems.
+  </p>
+</nz-card>
+
+<nz-card nzType="inner" nzTitle="Data Skew" [nzLoading]="isLoading" 
[nzExtra]="extraTemplate">
+  <nz-table
+    class="no-border small"
+    [nzSize]="'small'"
+    [nzData]="listOfVerticesAndSkew"
+    [nzFrontPagination]="false"
+    [nzShowPagination]="false"
+  >
+    <tbody>
+      <tr>
+        <th>Vertex</th>
+        <th>Data Skew Percentage</th>
+      </tr>
+      <ng-container *ngFor="let vertexSkew of listOfVerticesAndSkew">
+        <tr>
+          <td>{{ vertexSkew.vertexName }}</td>
+          <td>{{ vertexSkew.skewPct }}%</td>
+        </tr>
+      </ng-container>
+    </tbody>
+  </nz-table>
+</nz-card>
+
+<nz-card nzType="inner" nzTitle="How is Data Skew Calculated?">
+  <p>
+    Data skew is calculated using the Coefficient of Variation (CV) statistic. 
The data skew
+    percentage shown here is calculated by using the
+    <i>numRecordsIn</i>
+    metric across the subtasks of your operators. The data skew percentage 
under the Overview tab
+    shows a live data skew percentage by using the
+    <i>numRecordsInPerSecond</i>
+    metric and therefore can be different from the skew percentage shown here.
+  </p>
+</nz-card>
+
+<ng-template #extraTemplate>
+  <button nz-button nzType="primary" class="refresh" nzSize="small" 
(click)="refresh()">
+    <i nz-icon nzType="sync"></i>
+    Refresh
+  </button>
+</ng-template>
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts 
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.less
similarity index 74%
copy from flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
copy to 
flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.less
index 1e425ad6c9d..303f0d64ca2 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-metrics.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.less
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-export interface JobMetric {
-  id: string;
-  value: string;
-}
-
-export interface MetricMap {
-  [p: string]: number;
-}
+:host {
+  ::ng-deep {
+    .nz-disable-td {
+      width: 100%;
+    }
 
-export interface MetricMapWithTimestamp {
-  timestamp: number;
-  values: MetricMap;
+    .ant-table-cell {
+      white-space: pre-wrap;
+    }
+  }
 }
 
-export interface Watermarks {
-  lowWatermark: number;
-  watermarks: MetricMap;
+nz-card {
+  margin: 24px;
 }
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.ts
new file mode 100644
index 00000000000..2e958f1e160
--- /dev/null
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/dataskew/data-skew.component.ts
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { NgForOf } from '@angular/common';
+import { ChangeDetectionStrategy, Component, OnDestroy, OnInit, 
ChangeDetectorRef } from '@angular/core';
+import { Subject } from 'rxjs';
+import { distinctUntilChanged, takeUntil, map, mergeMap } from 
'rxjs/operators';
+
+import { JobDetailCorrect } from '@flink-runtime-web/interfaces';
+import { MetricsService } from '@flink-runtime-web/services';
+import { NzButtonModule } from 'ng-zorro-antd/button';
+import { NzCardModule } from 'ng-zorro-antd/card';
+import { NzIconModule } from 'ng-zorro-antd/icon';
+import { NzTableModule } from 'ng-zorro-antd/table';
+
+import { JobLocalService } from '../job-local.service';
+
+@Component({
+  selector: 'flink-data-skew',
+  templateUrl: './data-skew.component.html',
+  styleUrls: ['./data-skew.component.less'],
+  changeDetection: ChangeDetectionStrategy.OnPush,
+  imports: [NzButtonModule, NzCardModule, NzTableModule, NgForOf, 
NzIconModule],
+  standalone: true
+})
+export class DataSkewComponent implements OnInit, OnDestroy {
+  public listOfVerticesAndSkew: Array<{ vertexName: string; skewPct: number }> 
= [];
+  public isLoading = true;
+  public jobDetail: JobDetailCorrect;
+
+  private destroy$ = new Subject<void>();
+  private refresh$ = new Subject<void>();
+
+  constructor(
+    private readonly metricsService: MetricsService,
+    private readonly jobLocalService: JobLocalService,
+    private readonly cdr: ChangeDetectorRef
+  ) {}
+
+  public ngOnInit(): void {
+    this.refresh$
+      .pipe(
+        map(() => {
+          return this.jobDetail;
+        }),
+        takeUntil(this.destroy$)
+      )
+      .pipe(map(jobDetail => jobDetail.vertices))
+      .pipe(
+        mergeMap(vertices => {
+          const result: Array<{ vertexName: string; skewPct: number }> = [];
+          vertices.forEach(v => {
+            this.metricsService
+              .loadAggregatedMetrics(this.jobDetail.jid, v.id, 
['numRecordsIn'], 'skew')
+              .subscribe(metricMap => {
+                const skew = Number.isNaN(+metricMap['numRecordsIn']) ? 0 : 
Math.round(metricMap['numRecordsIn']);
+                result.push({ vertexName: v.name, skewPct: skew });
+                result.sort((a, b) => (a.skewPct > b.skewPct ? -1 : 1));
+                this.isLoading = false;
+                this.listOfVerticesAndSkew = result;
+                this.cdr.markForCheck();
+              });
+          });
+          return result;
+        })
+      )
+      .subscribe(_ => {}); // no-op subscriber to trigger the execution of the 
lazy processing
+
+    this.jobLocalService
+      .jobDetailChanges()
+      .pipe(
+        distinctUntilChanged((pre, next) => pre.jid === next.jid),
+        takeUntil(this.destroy$)
+      )
+      .subscribe(data => {
+        this.jobDetail = data;
+        this.cdr.markForCheck();
+        this.refresh$.next();
+      });
+  }
+
+  public ngOnDestroy(): void {
+    this.destroy$.next();
+    this.destroy$.complete();
+    this.refresh$.complete();
+  }
+
+  public refresh(): void {
+    this.refresh$.next();
+  }
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts 
b/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts
index 0c233f88923..59ef1027518 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts
@@ -26,6 +26,7 @@ export const JOB_MODULE_DEFAULT_CONFIG: 
Required<JobModuleConfig> = {
   routerTabs: [
     { title: 'Overview', path: 'overview' },
     { title: 'Exceptions', path: 'exceptions' },
+    { title: 'Data Skew', path: 'dataskew' },
     { title: 'TimeLine', path: 'timeline' },
     { title: 'Checkpoints', path: 'checkpoints' },
     { title: 'Configuration', path: 'configuration' }
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts
index dedefa4233a..496acbaf2e2 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts
@@ -101,6 +101,14 @@ export const COMPLETED_JOB_ROUES: Routes = [
           path: 'exceptions'
         }
       },
+      {
+        path: 'dataskew',
+        loadComponent: () =>
+          
import('@flink-runtime-web/pages/job/dataskew/data-skew.component').then(m => 
m.DataSkewComponent),
+        data: {
+          path: 'dataskew'
+        }
+      },
       {
         path: 'checkpoints',
         loadComponent: () =>
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts
index 63f84c7232b..c09afabf9ab 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts
@@ -52,6 +52,14 @@ export const RUNNING_JOB_ROUTES: Routes = [
           path: 'exceptions'
         }
       },
+      {
+        path: 'dataskew',
+        loadComponent: () =>
+          
import('@flink-runtime-web/pages/job/dataskew/data-skew.component').then(m => 
m.DataSkewComponent),
+        data: {
+          path: 'dataskew'
+        }
+      },
       {
         path: 'checkpoints',
         loadComponent: () =>
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
index 10f7c034b40..239b56d7fb2 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
@@ -137,7 +137,7 @@ export class JobOverviewComponent implements OnInit, 
OnDestroy {
   }
 
   public refreshNodesWithMetrics(): void {
-    this.mergeWithBackPressure(this.nodes)
+    this.mergeWithBackPressureAndSkew(this.nodes)
       .pipe(
         mergeMap(nodes => this.mergeWithWatermarks(nodes)),
         takeUntil(this.destroy$)
@@ -150,17 +150,22 @@ export class JobOverviewComponent implements OnInit, 
OnDestroy {
       });
   }
 
-  private mergeWithBackPressure(nodes: NodesItemCorrect[]): 
Observable<NodesItemCorrect[]> {
+  private mergeWithBackPressureAndSkew(nodes: NodesItemCorrect[]): 
Observable<NodesItemCorrect[]> {
     return forkJoin(
       nodes.map(node => {
         return this.metricService
-          .loadAggregatedMetrics(this.jobId, node.id, 
['backPressuredTimeMsPerSecond', 'busyTimeMsPerSecond'])
+          .loadMetricsWithAllAggregates(this.jobId, node.id, [
+            'backPressuredTimeMsPerSecond',
+            'busyTimeMsPerSecond',
+            'numRecordsInPerSecond'
+          ])
           .pipe(
             map(result => {
               return {
                 ...node,
-                backPressuredPercentage: 
Math.min(Math.round(result.backPressuredTimeMsPerSecond / 10), 100),
-                busyPercentage: Math.min(Math.round(result.busyTimeMsPerSecond 
/ 10), 100)
+                backPressuredPercentage: 
Math.min(Math.round(result.backPressuredTimeMsPerSecond.max / 10), 100),
+                busyPercentage: 
Math.min(Math.round(result.busyTimeMsPerSecond.max / 10), 100),
+                dataSkewPercentage: result.numRecordsInPerSecond.skew
               };
             })
           );
diff --git 
a/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts 
b/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
index e6e475b1517..cddefea45f3 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
@@ -21,7 +21,13 @@ import { Injectable } from '@angular/core';
 import { Observable } from 'rxjs';
 import { map } from 'rxjs/operators';
 
-import { MetricMap, JobMetric, Watermarks, MetricMapWithTimestamp } from 
'@flink-runtime-web/interfaces';
+import {
+  MetricMap,
+  MetricMapWithAllAggregates,
+  JobMetric,
+  Watermarks,
+  MetricMapWithTimestamp
+} from '@flink-runtime-web/interfaces';
 
 import { ConfigService } from './config.service';
 
@@ -71,45 +77,73 @@ export class MetricsService {
       );
   }
 
-  /** Get aggregated metric data from all subtasks of the given vertexId. */
-  public loadAggregatedMetrics(
+  /** Get aggregated metric data from all subtasks of the given vertexId. 
Example output:
+  { "numRecordsIn": { "min": 0.0, "max": 10.0, "sum": 15.0, "avg": 5.0, 
"skew": 66.0 } } */
+  public loadMetricsWithAllAggregates(
     jobId: string,
     vertexId: string,
-    listOfMetricName: string[],
-    aggregate: string = 'max'
-  ): Observable<MetricMap> {
+    listOfMetricName: string[]
+  ): Observable<MetricMapWithAllAggregates> {
     const metricName = listOfMetricName.join(',');
     return this.httpClient
-      .get<Array<{ id: string; min: number; max: number; avg: number; sum: 
number }>>(
+      .get<Array<{ id: string; min: number; max: number; avg: number; sum: 
number; skew: number }>>(
         
`${this.configService.BASE_URL}/jobs/${jobId}/vertices/${vertexId}/subtasks/metrics`,
         { params: { get: metricName } }
       )
       .pipe(
         map(arr => {
-          const result: MetricMap = {};
+          const result: MetricMapWithAllAggregates = {};
           arr.forEach(item => {
-            switch (aggregate) {
-              case 'min':
-                result[item.id] = +item.min;
-                break;
-              case 'max':
-                result[item.id] = +item.max;
-                break;
-              case 'avg':
-                result[item.id] = +item.avg;
-                break;
-              case 'sum':
-                result[item.id] = +item.sum;
-                break;
-              default:
-                throw new Error(`Unsupported aggregate: ${aggregate}`);
-            }
+            result[item.id] = { min: NaN, max: NaN, avg: NaN, sum: NaN, skew: 
NaN };
+            result[item.id].min = +item.min;
+            result[item.id].max = +item.max;
+            result[item.id].avg = +item.avg;
+            result[item.id].sum = +item.sum;
+            result[item.id].skew = +item.skew;
           });
           return result;
         })
       );
   }
 
+  /** Get metric data from all subtasks of the given vertexId, aggregated by a 
given aggregation type
+  Default aggregation type: max */
+  public loadAggregatedMetrics(
+    jobId: string,
+    vertexId: string,
+    listOfMetricName: string[],
+    aggregate: string = 'max'
+  ): Observable<MetricMap> {
+    const result: MetricMap = {};
+    return this.loadMetricsWithAllAggregates(jobId, vertexId, 
listOfMetricName).pipe(
+      map((metricMapWithAllAggregates: MetricMapWithAllAggregates) => {
+        for (const metricName in metricMapWithAllAggregates) {
+          const value = metricMapWithAllAggregates[metricName];
+          switch (aggregate) {
+            case 'min':
+              result[metricName] = +value.min;
+              break;
+            case 'max':
+              result[metricName] = +value.max;
+              break;
+            case 'avg':
+              result[metricName] = +value.avg;
+              break;
+            case 'sum':
+              result[metricName] = +value.sum;
+              break;
+            case 'skew':
+              result[metricName] = +value.skew;
+              break;
+            default:
+              throw new Error(`Unsupported aggregate: ${aggregate}`);
+          }
+        }
+        return result;
+      })
+    );
+  }
+
   public loadWatermarks(jobId: string, vertexId: string): 
Observable<Watermarks> {
     return this.httpClient
       
.get<JobMetric[]>(`${this.configService.BASE_URL}/jobs/${jobId}/vertices/${vertexId}/watermarks`)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
index 7c3eab054bf..038c0282096 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -124,12 +124,14 @@ public abstract class AbstractAggregatingMetricsHandler<
                         DoubleAccumulator.DoubleMaximumFactory maximumFactory 
= null;
                         DoubleAccumulator.DoubleAverageFactory averageFactory 
= null;
                         DoubleAccumulator.DoubleSumFactory sumFactory = null;
+                        DoubleAccumulator.DoubleDataSkewFactory skewFactory = 
null;
                         // by default we return all aggregations
                         if (requestedAggregations.isEmpty()) {
                             minimumFactory = 
DoubleAccumulator.DoubleMinimumFactory.get();
                             maximumFactory = 
DoubleAccumulator.DoubleMaximumFactory.get();
                             averageFactory = 
DoubleAccumulator.DoubleAverageFactory.get();
                             sumFactory = 
DoubleAccumulator.DoubleSumFactory.get();
+                            skewFactory = 
DoubleAccumulator.DoubleDataSkewFactory.get();
                         } else {
                             for (MetricsAggregationParameter.AggregationMode 
aggregation :
                                     requestedAggregations) {
@@ -149,6 +151,9 @@ public abstract class AbstractAggregatingMetricsHandler<
                                     case SUM:
                                         sumFactory = 
DoubleAccumulator.DoubleSumFactory.get();
                                         break;
+                                    case SKEW:
+                                        skewFactory = 
DoubleAccumulator.DoubleDataSkewFactory.get();
+                                        break;
                                     default:
                                         log.warn(
                                                 "Unsupported aggregation 
specified: {}",
@@ -158,7 +163,11 @@ public abstract class AbstractAggregatingMetricsHandler<
                         }
                         MetricAccumulatorFactory metricAccumulatorFactory =
                                 new MetricAccumulatorFactory(
-                                        minimumFactory, maximumFactory, 
averageFactory, sumFactory);
+                                        minimumFactory,
+                                        maximumFactory,
+                                        averageFactory,
+                                        sumFactory,
+                                        skewFactory);
 
                         return getAggregatedMetricValues(
                                 stores, requestedMetrics, 
metricAccumulatorFactory);
@@ -245,16 +254,19 @@ public abstract class AbstractAggregatingMetricsHandler<
         @Nullable private final DoubleAccumulator.DoubleAverageFactory 
averageFactory;
 
         @Nullable private final DoubleAccumulator.DoubleSumFactory sumFactory;
+        @Nullable private final DoubleAccumulator.DoubleDataSkewFactory 
dataSkewFactory;
 
         private MetricAccumulatorFactory(
                 @Nullable DoubleAccumulator.DoubleMinimumFactory 
minimumFactory,
                 @Nullable DoubleAccumulator.DoubleMaximumFactory 
maximumFactory,
                 @Nullable DoubleAccumulator.DoubleAverageFactory 
averageFactory,
-                @Nullable DoubleAccumulator.DoubleSumFactory sumFactory) {
+                @Nullable DoubleAccumulator.DoubleSumFactory sumFactory,
+                @Nullable DoubleAccumulator.DoubleDataSkewFactory 
dataSkewFactory) {
             this.minimumFactory = minimumFactory;
             this.maximumFactory = maximumFactory;
             this.averageFactory = averageFactory;
             this.sumFactory = sumFactory;
+            this.dataSkewFactory = dataSkewFactory;
         }
 
         MetricAccumulator get(String metricName, double init) {
@@ -263,7 +275,8 @@ public abstract class AbstractAggregatingMetricsHandler<
                     minimumFactory == null ? null : minimumFactory.get(init),
                     maximumFactory == null ? null : maximumFactory.get(init),
                     averageFactory == null ? null : averageFactory.get(init),
-                    sumFactory == null ? null : sumFactory.get(init));
+                    sumFactory == null ? null : sumFactory.get(init),
+                    dataSkewFactory == null ? null : 
dataSkewFactory.get(init));
         }
     }
 
@@ -274,18 +287,21 @@ public abstract class AbstractAggregatingMetricsHandler<
         @Nullable private final DoubleAccumulator max;
         @Nullable private final DoubleAccumulator avg;
         @Nullable private final DoubleAccumulator sum;
+        @Nullable private final DoubleAccumulator skew;
 
         private MetricAccumulator(
                 String metricName,
                 @Nullable DoubleAccumulator min,
                 @Nullable DoubleAccumulator max,
                 @Nullable DoubleAccumulator avg,
-                @Nullable DoubleAccumulator sum) {
+                @Nullable DoubleAccumulator sum,
+                @Nullable DoubleAccumulator.DoubleDataSkew skew) {
             this.metricName = Preconditions.checkNotNull(metricName);
             this.min = min;
             this.max = max;
             this.avg = avg;
             this.sum = sum;
+            this.skew = skew;
         }
 
         void add(double value) {
@@ -301,6 +317,9 @@ public abstract class AbstractAggregatingMetricsHandler<
             if (sum != null) {
                 sum.add(value);
             }
+            if (skew != null) {
+                skew.add(value);
+            }
         }
 
         AggregatedMetric get() {
@@ -309,7 +328,8 @@ public abstract class AbstractAggregatingMetricsHandler<
                     min == null ? null : min.getValue(),
                     max == null ? null : max.getValue(),
                     avg == null ? null : avg.getValue(),
-                    sum == null ? null : sum.getValue());
+                    sum == null ? null : sum.getValue(),
+                    skew == null ? null : skew.getValue());
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
index 4fc19a72cf9..7dd9ef1e386 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
@@ -18,6 +18,11 @@
 
 package org.apache.flink.runtime.rest.handler.job.metrics;
 
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.List;
+
 /** An interface for accumulating double values. */
 interface DoubleAccumulator {
 
@@ -123,6 +128,22 @@ interface DoubleAccumulator {
         }
     }
 
+    /** Factory for {@link DoubleDataSkew}. */
+    final class DoubleDataSkewFactory implements 
DoubleAccumulatorFactory<DoubleDataSkew> {
+        private static final DoubleDataSkewFactory INSTANCE = new 
DoubleDataSkewFactory();
+
+        private DoubleDataSkewFactory() {}
+
+        @Override
+        public DoubleDataSkew get(double init) {
+            return new DoubleDataSkew(init);
+        }
+
+        public static DoubleDataSkewFactory get() {
+            return INSTANCE;
+        }
+    }
+
     /** {@link DoubleAccumulator} that returns the maximum value. */
     final class DoubleMaximum implements DoubleAccumulator {
 
@@ -233,4 +254,52 @@ interface DoubleAccumulator {
             return NAME;
         }
     }
+
+    /**
+     * {@link DoubleAccumulator} that returns the skew percentage over all 
values. Uses a version of
+     * the Coefficient of Variation (CV) statistic to calculate skew. This 
version of CV uses
+     * average absolute deviation, instead of std deviation. This method 
currently assumes a dataset
+     * of positive numbers and 0.
+     */
+    final class DoubleDataSkew implements DoubleAccumulator {
+
+        public static final String NAME = "skew";
+
+        private final List<Double> values = new ArrayList<>();
+
+        @VisibleForTesting
+        DoubleDataSkew() {}
+
+        private DoubleDataSkew(double init) {
+            values.add(init);
+        }
+
+        @Override
+        public void add(double value) {
+            values.add(value);
+        }
+
+        @Override
+        public double getValue() {
+            if (values.isEmpty()) {
+                return 0.0;
+            }
+            double sum = values.stream().reduce(Double::sum).orElse(0.0);
+            double avg = sum / values.size();
+            if (avg == 0.0) {
+                // Avoid division by zero in below calculations
+                // This also makes sense because avg of 0 implies no data skew
+                return 0.0;
+            }
+            double totalAbsDev =
+                    values.stream().map(v -> Math.abs(avg - 
v)).reduce(Double::sum).orElse(0.0);
+            double avgDev = totalAbsDev / values.size();
+            return Math.min((avgDev / avg) * 100.0, 100.0);
+        }
+
+        @Override
+        public String getName() {
+            return NAME;
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
index e0e2b974065..bc72b8187a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
@@ -43,6 +43,8 @@ public class AggregatedMetric {
 
     private static final String FIELD_NAME_SUM = "sum";
 
+    private static final String FIELD_NAME_SKEW = "skew";
+
     @JsonProperty(value = FIELD_NAME_ID, required = true)
     private final String id;
 
@@ -62,23 +64,29 @@ public class AggregatedMetric {
     @JsonProperty(FIELD_NAME_SUM)
     private final Double sum;
 
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_SKEW)
+    private final Double skew;
+
     @JsonCreator
     public AggregatedMetric(
             final @JsonProperty(value = FIELD_NAME_ID, required = true) String 
id,
             final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
             final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
             final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
-            final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum) {
+            final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum,
+            final @Nullable @JsonProperty(FIELD_NAME_SKEW) Double skew) {
 
         this.id = requireNonNull(id, "id must not be null");
         this.min = min;
         this.max = max;
         this.avg = avg;
         this.sum = sum;
+        this.skew = skew;
     }
 
     public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID, 
required = true) String id) {
-        this(id, null, null, null, null);
+        this(id, null, null, null, null, null);
     }
 
     @JsonIgnore
@@ -106,6 +114,11 @@ public class AggregatedMetric {
         return avg;
     }
 
+    @JsonIgnore
+    public Double getSkew() {
+        return skew;
+    }
+
     @Override
     public String toString() {
         return "AggregatedMetric{"
@@ -124,6 +137,9 @@ public class AggregatedMetric {
                 + ", sum='"
                 + sum
                 + '\''
+                + ", skew='"
+                + skew
+                + '\''
                 + '}';
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
index 143c0d6203d..2f1cfcd867a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
@@ -59,6 +59,7 @@ public class MetricsAggregationParameter
         MIN,
         MAX,
         SUM,
-        AVG
+        AVG,
+        SKEW,
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
index 365517f8447..9c31860c205 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -384,5 +384,39 @@ abstract class AggregatingMetricsHandlerTestBase<
         assertThat(aggregatedMetric.getMax()).isCloseTo(3.0, within(0.1));
         assertThat(aggregatedMetric.getAvg()).isCloseTo(2.0, within(0.1));
         assertThat(aggregatedMetric.getSum()).isCloseTo(4.0, within(0.1));
+        assertThat(aggregatedMetric.getSkew()).isCloseTo(50.0, within(0.1));
+    }
+
+    @Test
+    void testDataSkewAggregation() throws Exception {
+        Map<String, List<String>> queryParams = new HashMap<>(4);
+        queryParams.put("get", Collections.singletonList("abc.metric1"));
+        queryParams.put("agg", Collections.singletonList("skew"));
+
+        HandlerRequest<EmptyRequestBody> request =
+                HandlerRequest.resolveParametersAndCreate(
+                        EmptyRequestBody.getInstance(),
+                        
handler.getMessageHeaders().getUnresolvedMessageParameters(),
+                        pathParameters,
+                        queryParams,
+                        Collections.emptyList());
+
+        AggregatedMetricsResponseBody response =
+                handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY).get();
+
+        Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+        assertThat(aggregatedMetrics).hasSize(1);
+        AggregatedMetric aggregatedMetric = 
aggregatedMetrics.iterator().next();
+
+        assertThat(aggregatedMetric.getId()).isEqualTo("abc.metric1");
+        // abc.metric1 has the data points: [1,3], avg=2
+        // mean absolute deviation = (abs(1-2) + abs(3-2))/2 = 1
+        // data skew = mean_abs_deviation/avg * 100 = 50
+        assertThat(aggregatedMetric.getSkew()).isCloseTo(50.0, within(0.1));
+        assertThat(aggregatedMetric.getMin()).isNull();
+        assertThat(aggregatedMetric.getAvg()).isNull();
+        assertThat(aggregatedMetric.getMax()).isNull();
+        assertThat(aggregatedMetric.getSum()).isNull();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulatorTest.java
new file mode 100644
index 00000000000..cdb9b2742e1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulatorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.junit.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.within;
+
+public class DoubleAccumulatorTest {
+
+    @ParameterizedTest
+    @MethodSource("dataSkewTests")
+    public void testDataSkew(double n1, double n2, double n3, double 
expectedSkew) {
+        DoubleAccumulator.DoubleDataSkew dataSkew =
+                DoubleAccumulator.DoubleDataSkewFactory.get().get(n1);
+        dataSkew.add(n2);
+        dataSkew.add(n3);
+        assertThat(dataSkew.getValue()).isCloseTo(expectedSkew, within(0.5));
+    }
+
+    @Test
+    public void testDataSkewOnEmptyList() {
+        DoubleAccumulator.DoubleDataSkew dataSkew = new 
DoubleAccumulator.DoubleDataSkew();
+        assertThat(dataSkew.getValue()).isEqualTo(0.0);
+    }
+
+    @Test
+    public void testDataSkewOnSingleValueList() {
+        DoubleAccumulator.DoubleDataSkew dataSkew =
+                DoubleAccumulator.DoubleDataSkewFactory.get().get(123);
+        assertThat(dataSkew.getValue()).isEqualTo(0.0);
+    }
+
+    private static Stream<Arguments> dataSkewTests() {
+        // Data set, followed by the expected data skew percentage
+        return Stream.of(
+                // Avg: (23 + 3 + 10) / 3 = 12
+                // Avg Absolute Deviation = ( (23 - 12) + (12 - 3) + ( 12 - 
10) ) / 3 = 7.33
+                // Skew Percentage = 7.33/12 * 100 -> 61%
+                Arguments.of(23.0, 3.0, 10.0, 61.0),
+                // Avg: (300 + 0 + 0) / 3 = 100
+                // Avg Absolute Deviation = ( (300 - 100) + (100 - 0) + (100 - 
0) ) / 3 =  133
+                // Skew Percentage = 133/100 * 100 -> 133% should be capped at 
100
+                Arguments.of(300.0, 0.0, 0.0, 100.0),
+                // Test against any possible division by zero errors
+                Arguments.of(0.0, 0.0, 0.0, 0.0),
+                // Test low skew,
+                Arguments.of(50.0, 51.0, 52.0, 1.0));
+    }
+}


Reply via email to