This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a345e9aa39440cb73e46a9140691a25eeec4a68
Author: Yu Chen <yuchen.e...@gmail.com>
AuthorDate: Thu Nov 30 19:58:33 2023 +0800

    [FLINK-33434][runtime-web] Support invoke async-profiler on TaskManager via 
REST API
---
 .../profiler/job-manager-profiler.component.html   |   2 +-
 .../profiler/task-manager-profiler.component.html} |  22 +--
 .../profiler/task-manager-profiler.component.less  |  29 +++-
 .../profiler/task-manager-profiler.component.ts    | 153 +++++++++++++++++++++
 .../src/app/pages/task-manager/routes.ts           |   8 ++
 .../status/task-manager-status.component.ts        |   3 +-
 .../src/app/services/task-manager.service.ts       |  27 ++++
 .../runtime/resourcemanager/ResourceManager.java   |  35 +++++
 .../resourcemanager/ResourceManagerGateway.java    |  28 ++++
 .../TaskManagerProfilingFileHandler.java           |  82 +++++++++++
 .../taskmanager/TaskManagerProfilingHandler.java   |  86 ++++++++++++
 .../TaskManagerProfilingListHandler.java           | 101 ++++++++++++++
 .../TaskManagerProfilingFileHeaders.java           |  63 +++++++++
 .../TaskManagerProfilingFileMessageParameters.java |  40 ++++++
 .../taskmanager/TaskManagerProfilingHeaders.java   |  79 +++++++++++
 .../TaskManagerProfilingListHeaders.java           |  80 +++++++++++
 .../flink/runtime/taskexecutor/FileType.java       |   3 +
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  21 +++
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  19 +++
 .../TaskExecutorGatewayDecoratorBase.java          |  12 ++
 .../runtime/util/profiler/ProfilingService.java    |   9 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  48 ++++++-
 .../utils/TestingResourceManagerGateway.java       |  49 +++++++
 .../TaskManagerProfilingHandlerTest.java           | 120 ++++++++++++++++
 .../TaskManagerProfilingListHandlerTest.java       | 126 +++++++++++++++++
 .../taskexecutor/TestingTaskExecutorGateway.java   |  16 +++
 .../TestingTaskExecutorGatewayBuilder.java         |  12 ++
 27 files changed, 1237 insertions(+), 36 deletions(-)

diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html
index ab46841f9c6..9d1e62788f4 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html
@@ -69,7 +69,7 @@
       nzType="warning"
       style="margin-top: 10px"
       nzShowIcon
-      nzMessage="Please set the config `rest.profiling.enabled=true` to enable 
this experimental profiler feature."
+      nzMessage="Please set the config `rest.profiling.enabled: true` to 
enable this experimental profiler feature."
     ></nz-alert>
   </div>
   <nz-table
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.html
similarity index 80%
copy from 
flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html
copy to 
flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.html
index ab46841f9c6..6124cd2a374 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.html
@@ -37,30 +37,12 @@
             nz-button
             nzType="primary"
             [nzLoading]="isCreating"
-            [disabled]="!isEnabled"
+            [disabled]="duration === null || !isEnabled"
             (click)="createProfilingInstance()"
             style="margin-left: 10px"
           >
             Create Profiling Instance
           </button>
-          <i
-            class="header-icon"
-            style="margin-left: 10px"
-            nz-icon
-            nz-tooltip
-            [nzTooltipTitle]="titleTemplate"
-            nzTooltipTitle=""
-            nzType="info-circle"
-          ></i>
-          <ng-template #titleTemplate>
-            <span>
-              Please refer to
-              <a href="https://github.com/async-profiler/async-profiler/wiki";>
-                async-profiler's wiki
-              </a>
-              for more detailed info of this feature.
-            </span>
-          </ng-template>
         </nz-form-control>
       </nz-form-item>
     </form>
@@ -69,7 +51,7 @@
       nzType="warning"
       style="margin-top: 10px"
       nzShowIcon
-      nzMessage="Please set the config `rest.profiling.enabled=true` to enable 
this experimental profiler feature."
+      nzMessage="You need to set the config `rest.profiler.enabled: true` to 
enable this experimental profiler feature."
     ></nz-alert>
   </div>
   <nz-table
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java
 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.less
similarity index 70%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java
copy to 
flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.less
index 2e20fa35d12..3b5de2d4f7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.less
@@ -16,13 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.taskexecutor;
+@import "theme";
 
-/** Different file types to request from the {@link TaskExecutor}. */
-public enum FileType {
-    /** The log file type for taskmanager. */
-    LOG,
+:host {
+  display: flex;
+  flex: 1;
 
-    /** The stdout file type for taskmanager. */
-    STDOUT,
+  ::ng-deep {
+    .ant-table-cell {
+      font-size: @font-size-sm;
+    }
+
+    ::-webkit-scrollbar {
+      display: none;
+    }
+
+    nz-table,
+    nz-spin,
+    cdk-virtual-scroll-viewport,
+    nz-table-inner-scroll,
+    .ant-spin-container,
+    .ant-table {
+      height: 100%;
+    }
+  }
 }
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.ts
new file mode 100644
index 00000000000..1bfb50ec137
--- /dev/null
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.ts
@@ -0,0 +1,153 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+import { CommonModule } from '@angular/common';
+import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, 
OnInit } from '@angular/core';
+import { FormsModule } from '@angular/forms';
+import { ActivatedRoute } from '@angular/router';
+import { Subject } from 'rxjs';
+import { mergeMap, startWith, takeUntil } from 'rxjs/operators';
+
+import {
+  HumanizeWatermarkPipe,
+  HumanizeWatermarkToDatetimePipe
+} from '@flink-runtime-web/components/humanize-watermark.pipe';
+import { ProfilingDetail } from '@flink-runtime-web/interfaces/job-profiler';
+import { StatusService, TaskManagerService } from 
'@flink-runtime-web/services';
+import { NzAlertModule } from 'ng-zorro-antd/alert';
+import { NzButtonModule } from 'ng-zorro-antd/button';
+import { NzCardModule } from 'ng-zorro-antd/card';
+import { NzFormModule } from 'ng-zorro-antd/form';
+import { NzInputNumberModule } from 'ng-zorro-antd/input-number';
+import { NzMessageModule, NzMessageService } from 'ng-zorro-antd/message';
+import { NzSelectModule } from 'ng-zorro-antd/select';
+import { NzSpaceModule } from 'ng-zorro-antd/space';
+import { NzTableModule } from 'ng-zorro-antd/table';
+
+@Component({
+  selector: 'flink-task-manager-profiler',
+  templateUrl: './task-manager-profiler.component.html',
+  styleUrls: ['./task-manager-profiler.component.less'],
+  changeDetection: ChangeDetectionStrategy.OnPush,
+  imports: [
+    NzCardModule,
+    NzFormModule,
+    NzInputNumberModule,
+    HumanizeWatermarkPipe,
+    FormsModule,
+    NzButtonModule,
+    NzAlertModule,
+    NzTableModule,
+    NzMessageModule,
+    CommonModule,
+    NzSpaceModule,
+    HumanizeWatermarkToDatetimePipe,
+    NzSelectModule
+  ],
+  standalone: true
+})
+export class TaskManagerProfilerComponent implements OnInit, OnDestroy {
+  private readonly destroy$ = new Subject<void>();
+  profilingList: ProfilingDetail[] = [];
+  isLoading = true;
+  isCreating = false;
+  duration = 30;
+  selectMode = 'CPU';
+  isEnabled = false;
+  formatterDuration = (value: number): string => `${value} s`;
+  parserDuration = (value: string): string => value.replace(' s', '');
+
+  constructor(
+    private taskManagerService: TaskManagerService,
+    private readonly activatedRoute: ActivatedRoute,
+    private readonly statusService: StatusService,
+    private message: NzMessageService,
+    private cdr: ChangeDetectorRef
+  ) {}
+
+  public createProfilingInstance(): void {
+    if (this.profilingList.length > 0 && this.profilingList[0].status === 
'RUNNING') {
+      this.message.warning('Please wait for last profiling finished.');
+      return;
+    }
+    this.isCreating = true;
+    const taskManagerId = 
this.activatedRoute.parent!.snapshot.params.taskManagerId;
+    this.taskManagerService.createProfilingInstance(taskManagerId, 
this.selectMode, this.duration).subscribe({
+      next: profilingDetail => {
+        this.profilingList.unshift(profilingDetail);
+        this.isCreating = false;
+        this.cdr.markForCheck();
+      },
+      error: () => {
+        this.isCreating = false;
+        this.cdr.markForCheck();
+      }
+    });
+  }
+
+  public ngOnInit(): void {
+    const taskManagerId = 
this.activatedRoute.parent!.snapshot.params.taskManagerId;
+    this.statusService.refresh$
+      .pipe(
+        startWith(true),
+        mergeMap(() => {
+          this.isLoading = true;
+          this.cdr.markForCheck();
+          return this.taskManagerService.loadProfilingList(taskManagerId);
+        }),
+        takeUntil(this.destroy$)
+      )
+      .subscribe({
+        next: data => {
+          this.profilingList = data.profilingList;
+          this.isLoading = false;
+          this.isEnabled = true;
+          this.cdr.markForCheck();
+        },
+        error: () => {
+          this.isLoading = false;
+          this.destroy$.next();
+          this.destroy$.complete();
+          this.cdr.markForCheck();
+        }
+      });
+  }
+
+  public downloadProfilingResult(filePath: string): void {
+    const taskManagerId = 
this.activatedRoute.parent!.snapshot.params.taskManagerId;
+    this.isLoading = true;
+    this.cdr.markForCheck();
+
+    this.taskManagerService.loadProfilingResult(taskManagerId, 
filePath).subscribe({
+      next: data => {
+        const anchor = document.createElement('a');
+        anchor.href = data.url;
+        anchor.download = data.url;
+        document.body.appendChild(anchor);
+        anchor.click();
+      },
+      complete: () => {
+        this.isLoading = false;
+        this.cdr.markForCheck();
+      }
+    });
+  }
+
+  public ngOnDestroy(): void {
+    this.destroy$.next();
+    this.destroy$.complete();
+  }
+}
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/routes.ts 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/routes.ts
index bd72b8fbda7..7c3a9158544 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/routes.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/routes.ts
@@ -54,6 +54,14 @@ export const TASK_MANAGER_ROUTES: Routes = [
           path: 'thread-dump'
         }
       },
+      {
+        path: 'profiler',
+        loadComponent: () =>
+          import('./profiler/task-manager-profiler.component').then(m => 
m.TaskManagerProfilerComponent),
+        data: {
+          path: 'profiler'
+        }
+      },
       {
         path: 'log-list/:logName',
         loadComponent: () =>
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts
index a8b98d688c0..04db56f8d10 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts
@@ -52,7 +52,8 @@ export class TaskManagerStatusComponent implements OnInit, 
OnDestroy {
     { path: 'logs', title: 'Logs' },
     { path: 'stdout', title: 'Stdout' },
     { path: 'log-list', title: 'Log List' },
-    { path: 'thread-dump', title: 'Thread Dump' }
+    { path: 'thread-dump', title: 'Thread Dump' },
+    { path: 'profiler', title: 'Profiler' }
   ];
   public taskManagerDetail?: TaskManagerDetail;
   public loading = true;
diff --git 
a/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts 
b/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts
index 017538ae957..6e387feceec 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts
@@ -31,6 +31,7 @@ import {
   TaskManagersItem,
   TaskManagerThreadDump
 } from '@flink-runtime-web/interfaces';
+import { ProfilingDetail, ProfilingList } from 
'@flink-runtime-web/interfaces/job-profiler';
 
 import { ConfigService } from './config.service';
 
@@ -119,4 +120,30 @@ export class TaskManagerService {
       .get<{ url: string 
}>(`${this.configService.BASE_URL}/jobs/${jobId}/taskmanagers/${taskManagerId}/log-url`)
       .pipe(map(data => data.url));
   }
+
+  loadProfilingList(taskManagerId: string): Observable<ProfilingList> {
+    return 
this.httpClient.get<ProfilingList>(`${this.configService.BASE_URL}/taskmanagers/${taskManagerId}/profiler`);
+  }
+
+  createProfilingInstance(taskManagerId: string, mode: string, duration: 
number): Observable<ProfilingDetail> {
+    const requestParam = { mode, duration };
+    return this.httpClient.post<ProfilingDetail>(
+      `${this.configService.BASE_URL}/taskmanagers/${taskManagerId}/profiler`,
+      requestParam
+    );
+  }
+
+  loadProfilingResult(taskManagerId: string, filePath: string): 
Observable<Record<string, string>> {
+    const url = 
`${this.configService.BASE_URL}/taskmanagers/${taskManagerId}/profiler/${filePath}`;
+    return this.httpClient
+      .get(url, { responseType: 'text', headers: new 
HttpHeaders().append('Cache-Control', 'no-cache') })
+      .pipe(
+        map(data => {
+          return {
+            data,
+            url
+          };
+        })
+      );
+  }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 400289aa849..efb0d55f517 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
 import 
org.apache.flink.runtime.resourcemanager.slotmanager.ResourceEventListener;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rest.messages.LogInfo;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -899,6 +900,40 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
         }
     }
 
+    @Override
+    public CompletableFuture<Collection<ProfilingInfo>> 
requestTaskManagerProfilingList(
+            ResourceID taskManagerId, Duration timeout) {
+        final WorkerRegistration<WorkerType> taskExecutor = 
taskExecutors.get(taskManagerId);
+        if (taskExecutor == null) {
+            log.debug(
+                    "Requested profiling list from unregistered TaskExecutor 
{}.",
+                    taskManagerId.getStringWithMetadata());
+            return FutureUtils.completedExceptionally(
+                    new UnknownTaskExecutorException(taskManagerId));
+        } else {
+            return 
taskExecutor.getTaskExecutorGateway().requestProfilingList(timeout);
+        }
+    }
+
+    @Override
+    public CompletableFuture<ProfilingInfo> requestProfiling(
+            ResourceID taskManagerId,
+            int duration,
+            ProfilingInfo.ProfilingMode mode,
+            Duration timeout) {
+        final WorkerRegistration<WorkerType> taskExecutor = 
taskExecutors.get(taskManagerId);
+
+        if (taskExecutor == null) {
+            log.debug(
+                    "Requested profiling from unregistered TaskExecutor {}.",
+                    taskManagerId.getStringWithMetadata());
+            return FutureUtils.completedExceptionally(
+                    new UnknownTaskExecutorException(taskManagerId));
+        } else {
+            return 
taskExecutor.getTaskExecutorGateway().requestProfiling(duration, mode, timeout);
+        }
+    }
+
     @Override
     @Local // Bug; see FLINK-27954
     public CompletableFuture<TaskExecutorThreadInfoGateway> 
requestTaskExecutorThreadInfoGateway(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 83bd69dc3d8..9faf8847493 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -37,6 +37,8 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rest.messages.LogInfo;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo.ProfilingMode;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
@@ -288,4 +290,30 @@ public interface ResourceManagerGateway
      */
     CompletableFuture<TaskExecutorThreadInfoGateway> 
requestTaskExecutorThreadInfoGateway(
             ResourceID taskManagerId, @RpcTimeout Time timeout);
+
+    /**
+     * Request profiling list from the given {@link TaskExecutor}.
+     *
+     * @param taskManagerId identifying the {@link TaskExecutor} to get 
profiling list from
+     * @param timeout for the asynchronous operation
+     * @return Future which is completed with the historical profiling list
+     */
+    CompletableFuture<Collection<ProfilingInfo>> 
requestTaskManagerProfilingList(
+            ResourceID taskManagerId, @RpcTimeout Duration timeout);
+
+    /**
+     * Requests the profiling instance from the given {@link TaskExecutor}.
+     *
+     * @param taskManagerId taskManagerId identifying the {@link TaskExecutor} 
to get the profiling
+     *     from
+     * @param duration profiling duration
+     * @param mode profiling mode {@link ProfilingMode}
+     * @param timeout timeout of the asynchronous operation
+     * @return Future containing the created profiling information
+     */
+    CompletableFuture<ProfilingInfo> requestProfiling(
+            ResourceID taskManagerId,
+            int duration,
+            ProfilingInfo.ProfilingMode mode,
+            @RpcTimeout Duration timeout);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingFileHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingFileHandler.java
new file mode 100644
index 00000000000..ae1b4b5f4a6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingFileHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ProfilingFileNamePathParameter;
+import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingFileMessageParameters;
+import org.apache.flink.runtime.taskexecutor.FileType;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Rest handler which serves the profiling result file of the {@link 
TaskExecutor}. */
+public class TaskManagerProfilingFileHandler
+        extends 
AbstractTaskManagerFileHandler<TaskManagerProfilingFileMessageParameters> {
+
+    public TaskManagerProfilingFileHandler(
+            @Nonnull GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+            @Nonnull Time timeout,
+            @Nonnull Map<String, String> responseHeaders,
+            @Nonnull
+                    UntypedResponseMessageHeaders<
+                                    EmptyRequestBody, 
TaskManagerProfilingFileMessageParameters>
+                            untypedResponseMessageHeaders,
+            @Nonnull GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            @Nonnull TransientBlobService transientBlobService,
+            @Nonnull Time cacheEntryDuration) {
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                untypedResponseMessageHeaders,
+                resourceManagerGatewayRetriever,
+                transientBlobService,
+                cacheEntryDuration);
+    }
+
+    @Override
+    protected CompletableFuture<TransientBlobKey> requestFileUpload(
+            ResourceManagerGateway resourceManagerGateway,
+            Tuple2<ResourceID, String> taskManagerIdAndFileName) {
+        return 
resourceManagerGateway.requestTaskManagerFileUploadByNameAndType(
+                taskManagerIdAndFileName.f0,
+                taskManagerIdAndFileName.f1,
+                FileType.PROFILER,
+                getTimeout());
+    }
+
+    @Override
+    protected String getFileName(HandlerRequest<EmptyRequestBody> 
handlerRequest) {
+        return 
handlerRequest.getPathParameter(ProfilingFileNamePathParameter.class);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandler.java
new file mode 100644
index 00000000000..b345c9aa135
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import 
org.apache.flink.runtime.rest.handler.resourcemanager.AbstractResourceManagerHandler;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.runtime.rest.messages.cluster.ProfilingRequestBody;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Rest handler which serves the profiling service from a {@link 
TaskExecutor}. */
+public class TaskManagerProfilingHandler
+        extends AbstractResourceManagerHandler<
+                RestfulGateway, ProfilingRequestBody, ProfilingInfo, 
TaskManagerMessageParameters> {
+    private final long maxDurationInSeconds;
+
+    public TaskManagerProfilingHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<ProfilingRequestBody, ProfilingInfo, 
TaskManagerMessageParameters>
+                    messageHeaders,
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            final Configuration configuration) {
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                messageHeaders,
+                resourceManagerGatewayRetriever);
+        this.maxDurationInSeconds =
+                
configuration.get(RestOptions.MAX_PROFILING_DURATION).getSeconds();
+    }
+
+    @Override
+    protected CompletableFuture<ProfilingInfo> handleRequest(
+            @Nonnull HandlerRequest<ProfilingRequestBody> request,
+            @Nonnull ResourceManagerGateway gateway)
+            throws RestHandlerException {
+        ProfilingRequestBody profilingRequest = request.getRequestBody();
+        int duration = profilingRequest.getDuration();
+        if (duration <= 0 || duration > maxDurationInSeconds) {
+            return FutureUtils.completedExceptionally(
+                    new IllegalArgumentException(
+                            String.format(
+                                    "`duration` must be set between (0s, 
%ds].",
+                                    maxDurationInSeconds)));
+        }
+        final ResourceID taskManagerId = 
request.getPathParameter(TaskManagerIdPathParameter.class);
+        return gateway.requestProfiling(
+                taskManagerId, duration, profilingRequest.getMode(), 
getTimeout());
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandler.java
new file mode 100644
index 00000000000..7528d1a9b7b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import 
org.apache.flink.runtime.rest.handler.resourcemanager.AbstractResourceManagerHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.runtime.rest.messages.ProfilingInfoList;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** Handler which serves detailed TaskManager profiling list information. */
+public class TaskManagerProfilingListHandler
+        extends AbstractResourceManagerHandler<
+                RestfulGateway, EmptyRequestBody, ProfilingInfoList, 
TaskManagerMessageParameters> {
+
+    private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+
+    public TaskManagerProfilingListHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, ProfilingInfoList, 
TaskManagerMessageParameters>
+                    messageHeaders,
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever) {
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                messageHeaders,
+                resourceManagerGatewayRetriever);
+
+        this.resourceManagerGatewayRetriever =
+                Preconditions.checkNotNull(resourceManagerGatewayRetriever);
+    }
+
+    @Override
+    protected CompletableFuture<ProfilingInfoList> handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody> request,
+            @Nonnull ResourceManagerGateway gateway)
+            throws RestHandlerException {
+        final ResourceID taskManagerId = 
request.getPathParameter(TaskManagerIdPathParameter.class);
+        final ResourceManagerGateway resourceManagerGateway =
+                getResourceManagerGateway(resourceManagerGatewayRetriever);
+        final CompletableFuture<Collection<ProfilingInfo>> profilingListFuture 
=
+                
resourceManagerGateway.requestTaskManagerProfilingList(taskManagerId, 
getTimeout());
+
+        return profilingListFuture
+                .thenApply(ProfilingInfoList::new)
+                .exceptionally(
+                        (throwable) -> {
+                            final Throwable strippedThrowable =
+                                    
ExceptionUtils.stripCompletionException(throwable);
+                            if (strippedThrowable instanceof 
UnknownTaskExecutorException) {
+                                throw new CompletionException(
+                                        new RestHandlerException(
+                                                "Could not find TaskExecutor " 
+ taskManagerId,
+                                                HttpResponseStatus.NOT_FOUND,
+                                                strippedThrowable));
+                            } else {
+                                throw new CompletionException(throwable);
+                            }
+                        });
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileHeaders.java
new file mode 100644
index 00000000000..97b1b3b4971
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileHeaders.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingFileHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.LogFileNamePathParameter;
+import 
org.apache.flink.runtime.rest.messages.RuntimeUntypedResponseMessageHeaders;
+
+/** Headers for the {@link TaskManagerProfilingFileHandler}. */
+public class TaskManagerProfilingFileHeaders
+        implements RuntimeUntypedResponseMessageHeaders<
+                EmptyRequestBody, TaskManagerProfilingFileMessageParameters> {
+
+    private static final TaskManagerProfilingFileHeaders INSTANCE =
+            new TaskManagerProfilingFileHeaders();
+
+    private static final String URL =
+            String.format(
+                    "/taskmanagers/:%s/profiler/:%s",
+                    TaskManagerIdPathParameter.KEY, 
LogFileNamePathParameter.KEY);
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public TaskManagerProfilingFileMessageParameters 
getUnresolvedMessageParameters() {
+        return new TaskManagerProfilingFileMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    public static TaskManagerProfilingFileHeaders getInstance() {
+        return INSTANCE;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileMessageParameters.java
new file mode 100644
index 00000000000..6f064543d25
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileMessageParameters.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingFileHandler;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.ProfilingFileNamePathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/** Parameters for {@link TaskManagerProfilingFileHandler}. */
+public class TaskManagerProfilingFileMessageParameters extends 
TaskManagerMessageParameters {
+
+    public final ProfilingFileNamePathParameter profilingFileNamePathParameter 
=
+            new ProfilingFileNamePathParameter();
+
+    @Override
+    public Collection<MessagePathParameter<?>> getPathParameters() {
+        return Collections.unmodifiableCollection(
+                Arrays.asList(profilingFileNamePathParameter, 
taskManagerIdParameter));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingHeaders.java
new file mode 100644
index 00000000000..bab34f834d5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingHeaders.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingHandler;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+import org.apache.flink.runtime.rest.messages.cluster.ProfilingRequestBody;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for the {@link TaskManagerProfilingHandler}. */
+public class TaskManagerProfilingHeaders
+        implements RuntimeMessageHeaders<
+                ProfilingRequestBody, ProfilingInfo, 
TaskManagerMessageParameters> {
+
+    private static final TaskManagerProfilingHeaders INSTANCE = new 
TaskManagerProfilingHeaders();
+
+    private static final String URL =
+            String.format("/taskmanagers/:%s/profiler", 
TaskManagerIdPathParameter.KEY);
+
+    private TaskManagerProfilingHeaders() {}
+
+    @Override
+    public Class<ProfilingRequestBody> getRequestClass() {
+        return ProfilingRequestBody.class;
+    }
+
+    @Override
+    public TaskManagerMessageParameters getUnresolvedMessageParameters() {
+        return new TaskManagerMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.POST;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    public static TaskManagerProfilingHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public Class<ProfilingInfo> getResponseClass() {
+        return ProfilingInfo.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns the profiling instance of the requested TaskManager.";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingListHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingListHeaders.java
new file mode 100644
index 00000000000..6f8267ee755
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingListHeaders.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingListHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ProfilingInfoList;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for the {@link TaskManagerProfilingListHandler}. */
+public class TaskManagerProfilingListHeaders
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, ProfilingInfoList, 
TaskManagerMessageParameters> {
+
+    private static final TaskManagerProfilingListHeaders INSTANCE =
+            new TaskManagerProfilingListHeaders();
+
+    private static final String URL =
+            String.format("/taskmanagers/:%s/profiler", 
TaskManagerIdPathParameter.KEY);
+
+    private TaskManagerProfilingListHeaders() {}
+
+    public static TaskManagerProfilingListHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public Class<ProfilingInfoList> getResponseClass() {
+        return ProfilingInfoList.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns the list of profiling result files on a TaskManager.";
+    }
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public TaskManagerMessageParameters getUnresolvedMessageParameters() {
+        return new TaskManagerMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java
index 2e20fa35d12..abb1505c40b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java
@@ -25,4 +25,7 @@ public enum FileType {
 
     /** The stdout file type for taskmanager. */
     STDOUT,
+
+    /** The profiler file type for taskmanager. */
+    PROFILER,
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index da753f4b8e4..8ac63f602af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -92,6 +92,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
 import org.apache.flink.runtime.rest.messages.LogInfo;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -134,6 +135,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
 import org.apache.flink.runtime.util.GroupCache;
+import org.apache.flink.runtime.util.profiler.ProfilingService;
 import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
 import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.CollectionUtil;
@@ -305,6 +307,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
     private final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup>
             shuffleDescriptorsCache;
 
+    private final ProfilingService profilingService;
+
     public TaskExecutor(
             RpcService rpcService,
             TaskManagerConfiguration taskManagerConfiguration,
@@ -375,6 +379,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         ScheduledExecutorService sampleExecutor =
                 
Executors.newSingleThreadScheduledExecutor(sampleThreadFactory);
         this.threadInfoSampleService = new 
ThreadInfoSampleService(sampleExecutor);
+        this.profilingService =
+                
ProfilingService.getInstance(taskManagerConfiguration.getConfiguration());
 
         this.slotAllocationSnapshotPersistenceService =
                 
taskExecutorServices.getSlotAllocationSnapshotPersistenceService();
@@ -1304,6 +1310,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             case LOG:
                 baseDir = taskManagerConfiguration.getTaskManagerLogDir();
                 break;
+            case PROFILER:
+                baseDir = profilingService.getProfilingResultDir();
+                break;
             default:
                 baseDir = null;
         }
@@ -1411,6 +1420,18 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         }
     }
 
+    @Override
+    public CompletableFuture<ProfilingInfo> requestProfiling(
+            int duration, ProfilingInfo.ProfilingMode mode, Duration timeout) {
+        return profilingService.requestProfiling(
+                getResourceID().getResourceIdString(), duration, mode);
+    }
+
+    @Override
+    public CompletableFuture<Collection<ProfilingInfo>> 
requestProfilingList(Duration timeout) {
+        return 
profilingService.getProfilingList(getResourceID().getResourceIdString());
+    }
+
     // ------------------------------------------------------------------------
     //  Internal resource manager connection methods
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index c70e817cb33..d3e9dd119a5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rest.messages.LogInfo;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
@@ -314,4 +315,22 @@ public interface TaskExecutorGateway
      */
     CompletableFuture<Acknowledge> updateDelegationTokens(
             ResourceManagerId resourceManagerId, byte[] tokens);
+
+    /**
+     * Requests the profiling from this TaskManager.
+     *
+     * @param duration profiling duration
+     * @param mode profiling mode {@link ProfilingInfo.ProfilingMode}
+     * @param timeout timeout for the asynchronous operation
+     * @return the {@link ProfilingInfo} for this TaskManager.
+     */
+    CompletableFuture<ProfilingInfo> requestProfiling(
+            int duration, ProfilingInfo.ProfilingMode mode, @RpcTimeout 
Duration timeout);
+
+    /**
+     * Requests for the historical profiling file names on the TaskManager.
+     *
+     * @return A Collection with all profiling instances information.
+     */
+    CompletableFuture<Collection<ProfilingInfo>> 
requestProfilingList(@RpcTimeout Duration timeout);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
index e0e8ff7af48..bd180c0de73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.messages.TaskThreadInfoResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rest.messages.LogInfo;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
 import org.apache.flink.types.SerializableOptional;
@@ -245,6 +246,17 @@ public class TaskExecutorGatewayDecoratorBase implements 
TaskExecutorGateway {
         return originalGateway.updateDelegationTokens(resourceManagerId, 
tokens);
     }
 
+    @Override
+    public CompletableFuture<ProfilingInfo> requestProfiling(
+            int duration, ProfilingInfo.ProfilingMode mode, Duration timeout) {
+        return originalGateway.requestProfiling(duration, mode, timeout);
+    }
+
+    @Override
+    public CompletableFuture<Collection<ProfilingInfo>> 
requestProfilingList(Duration timeout) {
+        return originalGateway.requestProfilingList(timeout);
+    }
+
     @Override
     public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
             Collection<ExecutionAttemptID> taskExecutionAttemptIds,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java
index d8a67339810..5af2dfd6336 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java
@@ -179,6 +179,10 @@ public class ProfilingService implements Closeable {
                 profilingMap.getOrDefault(resourceID, new ArrayDeque<>()));
     }
 
+    public String getProfilingResultDir() {
+        return profilingResultDir;
+    }
+
     @VisibleForTesting
     ArrayDeque<ProfilingInfo> getProfilingListForTest(String resourceID) {
         return profilingMap.getOrDefault(resourceID, new ArrayDeque<>());
@@ -194,11 +198,6 @@ public class ProfilingService implements Closeable {
         return profilingFuture;
     }
 
-    @VisibleForTesting
-    public String getProfilingResultDir() {
-        return profilingResultDir;
-    }
-
     enum ProfilerConstants {
         PROFILER_STARTED_SUCCESS("Profiling started"),
         PROFILER_STOPPED_SUCCESS("OK"),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 42fd92f47e5..95f142b4b15 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -101,6 +101,9 @@ import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerCustomLogHan
 import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
 import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler;
 import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogListHandler;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingFileHandler;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingHandler;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingListHandler;
 import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler;
 import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpHandler;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
@@ -152,6 +155,9 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHe
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogsHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingFileHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingListHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerThreadDumpHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
@@ -964,6 +970,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
         handlers.add(
                 Tuple2.of(JobManagerThreadDumpHeaders.getInstance(), 
jobManagerThreadDumpHandler));
 
+        final Time cacheEntryDuration = 
Time.milliseconds(restConfiguration.getRefreshInterval());
+
         // load profiler relative handlers
         if (clusterConfiguration.get(RestOptions.ENABLE_PROFILER)) {
             final JobManagerProfilingHandler jobManagerProfilingHandler =
@@ -987,6 +995,32 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                             responseHeaders,
                             JobManagerProfilingFileHeaders.getInstance(),
                             clusterConfiguration);
+
+            final TaskManagerProfilingHandler taskManagerProfilingHandler =
+                    new TaskManagerProfilingHandler(
+                            leaderRetriever,
+                            timeout,
+                            responseHeaders,
+                            TaskManagerProfilingHeaders.getInstance(),
+                            resourceManagerRetriever,
+                            clusterConfiguration);
+
+            final TaskManagerProfilingListHandler 
taskManagerProfilingListHandler =
+                    new TaskManagerProfilingListHandler(
+                            leaderRetriever,
+                            timeout,
+                            responseHeaders,
+                            TaskManagerProfilingListHeaders.getInstance(),
+                            resourceManagerRetriever);
+            final TaskManagerProfilingFileHandler 
taskManagerProfilingFileHandler =
+                    new TaskManagerProfilingFileHandler(
+                            leaderRetriever,
+                            timeout,
+                            responseHeaders,
+                            TaskManagerProfilingFileHeaders.getInstance(),
+                            resourceManagerRetriever,
+                            transientBlobService,
+                            cacheEntryDuration);
             handlers.add(
                     Tuple2.of(
                             JobManagerProfilingHeaders.getInstance(), 
jobManagerProfilingHandler));
@@ -998,12 +1032,22 @@ public class WebMonitorEndpoint<T extends 
RestfulGateway> extends RestServerEndp
                     Tuple2.of(
                             JobManagerProfilingFileHeaders.getInstance(),
                             jobManagerProfilingFileHandler));
+            handlers.add(
+                    Tuple2.of(
+                            TaskManagerProfilingHeaders.getInstance(),
+                            taskManagerProfilingHandler));
+            handlers.add(
+                    Tuple2.of(
+                            TaskManagerProfilingListHeaders.getInstance(),
+                            taskManagerProfilingListHandler));
+            handlers.add(
+                    Tuple2.of(
+                            TaskManagerProfilingFileHeaders.getInstance(),
+                            taskManagerProfilingFileHandler));
         }
 
         // TaskManager log and stdout file handler
 
-        final Time cacheEntryDuration = 
Time.milliseconds(restConfiguration.getRefreshInterval());
-
         final TaskManagerLogFileHandler taskManagerLogFileHandler =
                 new TaskManagerLogFileHandler(
                         leaderRetriever,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 91e13eb34f5..088fed278bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
 import org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
 import org.apache.flink.runtime.rest.messages.LogInfo;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -123,6 +124,12 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
     private volatile Function<ResourceID, CompletableFuture<ThreadDumpInfo>>
             requestThreadDumpFunction;
 
+    private volatile Function<ResourceID, CompletableFuture<ProfilingInfo>>
+            requestProfilingFunction;
+
+    private volatile Function<ResourceID, 
CompletableFuture<Collection<ProfilingInfo>>>
+            requestProfilingListFunction;
+
     private volatile BiFunction<JobMasterId, ResourceRequirements, 
CompletableFuture<Acknowledge>>
             declareRequiredResourcesFunction =
                     (ignoredA, ignoredB) ->
@@ -249,6 +256,17 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
         this.requestThreadDumpFunction = requestThreadDumpFunction;
     }
 
+    public void setRequestProfilingFunction(
+            Function<ResourceID, CompletableFuture<ProfilingInfo>> 
requestProfilingFunction) {
+        this.requestProfilingFunction = requestProfilingFunction;
+    }
+
+    public void setRequestProfilingListFunction(
+            Function<ResourceID, CompletableFuture<Collection<ProfilingInfo>>>
+                    requestProfilingListFunction) {
+        this.requestProfilingListFunction = requestProfilingListFunction;
+    }
+
     public void setDeclareRequiredResourcesFunction(
             BiFunction<JobMasterId, ResourceRequirements, 
CompletableFuture<Acknowledge>>
                     declareRequiredResourcesFunction) {
@@ -507,6 +525,37 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
         }
     }
 
+    @Override
+    public CompletableFuture<Collection<ProfilingInfo>> 
requestTaskManagerProfilingList(
+            ResourceID taskManagerId, Duration timeout) {
+        final Function<ResourceID, 
CompletableFuture<Collection<ProfilingInfo>>> function =
+                this.requestProfilingListFunction;
+
+        if (function != null) {
+            return function.apply(taskManagerId);
+        } else {
+            return FutureUtils.completedExceptionally(
+                    new UnknownTaskExecutorException(taskManagerId));
+        }
+    }
+
+    @Override
+    public CompletableFuture<ProfilingInfo> requestProfiling(
+            ResourceID taskManagerId,
+            int duration,
+            ProfilingInfo.ProfilingMode mode,
+            Duration timeout) {
+        final Function<ResourceID, CompletableFuture<ProfilingInfo>> function =
+                this.requestProfilingFunction;
+
+        if (function != null) {
+            return function.apply(taskManagerId);
+        } else {
+            return FutureUtils.completedExceptionally(
+                    new UnknownTaskExecutorException(taskManagerId));
+        }
+    }
+
     @Override
     public ResourceManagerId getFencingToken() {
         return resourceManagerId;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandlerTest.java
new file mode 100644
index 00000000000..b187ad8097b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandlerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.runtime.rest.messages.cluster.ProfilingRequestBody;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingHeaders;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the {@link TaskManagerProfilingHandler}. */
+class TaskManagerProfilingHandlerTest {
+
+    private static final ResourceID EXPECTED_TASK_MANAGER_ID = 
ResourceID.generate();
+    private TestingResourceManagerGateway resourceManagerGateway;
+    private TaskManagerProfilingHandler taskManagerProfilingHandler;
+    private HandlerRequest<ProfilingRequestBody> handlerRequest;
+
+    @BeforeEach
+    void setUp(@TempDir Path tempDir) throws HandlerRequestException {
+        Configuration clusterConfiguration = new Configuration();
+        clusterConfiguration.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, 3);
+        clusterConfiguration.set(RestOptions.PROFILING_RESULT_DIR, 
tempDir.toString());
+        resourceManagerGateway = new TestingResourceManagerGateway();
+        taskManagerProfilingHandler =
+                new TaskManagerProfilingHandler(
+                        () -> CompletableFuture.completedFuture(null),
+                        TestingUtils.TIMEOUT,
+                        Collections.emptyMap(),
+                        TaskManagerProfilingHeaders.getInstance(),
+                        () -> 
CompletableFuture.completedFuture(resourceManagerGateway),
+                        clusterConfiguration);
+        handlerRequest = createRequest(EXPECTED_TASK_MANAGER_ID);
+    }
+
+    @Test
+    void testGetTaskManagerProfiling() throws Exception {
+        ProfilingInfo profilingInfo = ProfilingInfo.create(30, 
ProfilingInfo.ProfilingMode.ITIMER);
+        resourceManagerGateway.setRequestProfilingFunction(
+                EXPECTED_TASK_MANAGER_ID -> 
CompletableFuture.completedFuture(profilingInfo));
+        ProfilingInfo profilingInfoResp =
+                taskManagerProfilingHandler
+                        .handleRequest(handlerRequest, resourceManagerGateway)
+                        .get();
+        assertThat(profilingInfoResp).isEqualTo(profilingInfo);
+    }
+
+    @Test
+    void testGetTaskManagerProfilingForUnknownTaskExecutorException() throws 
Exception {
+        resourceManagerGateway.setRequestProfilingListFunction(
+                EXPECTED_TASK_MANAGER_ID ->
+                        FutureUtils.completedExceptionally(
+                                new 
UnknownTaskExecutorException(EXPECTED_TASK_MANAGER_ID)));
+        try {
+            taskManagerProfilingHandler.handleRequest(handlerRequest, 
resourceManagerGateway).get();
+        } catch (ExecutionException e) {
+            final Throwable cause = e.getCause();
+            assertThat(cause).isInstanceOf(UnknownTaskExecutorException.class);
+
+            final UnknownTaskExecutorException unknownTaskExecutorException =
+                    (UnknownTaskExecutorException) cause;
+            assertThat(unknownTaskExecutorException.getMessage())
+                    .contains("No TaskExecutor registered under " + 
EXPECTED_TASK_MANAGER_ID);
+        }
+    }
+
+    private static HandlerRequest<ProfilingRequestBody> 
createRequest(ResourceID taskManagerId)
+            throws HandlerRequestException {
+        Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(TaskManagerIdPathParameter.KEY, 
taskManagerId.toString());
+        Map<String, List<String>> queryParameters = Collections.emptyMap();
+        ProfilingRequestBody requestBody =
+                new ProfilingRequestBody(ProfilingInfo.ProfilingMode.ITIMER, 
10);
+        return HandlerRequest.resolveParametersAndCreate(
+                requestBody,
+                new TaskManagerMessageParameters(),
+                pathParameters,
+                queryParameters,
+                Collections.emptyList());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandlerTest.java
new file mode 100644
index 00000000000..ab99d3ee513
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandlerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.runtime.rest.messages.ProfilingInfoList;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingListHeaders;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the {@link TaskManagerProfilingListHandler}. */
+class TaskManagerProfilingListHandlerTest {
+
+    private static final ResourceID EXPECTED_TASK_MANAGER_ID = 
ResourceID.generate();
+    private TestingResourceManagerGateway resourceManagerGateway;
+    private TaskManagerProfilingListHandler taskManagerProfilingListHandler;
+    private HandlerRequest<EmptyRequestBody> handlerRequest;
+
+    @BeforeEach
+    void setUp() throws HandlerRequestException {
+        resourceManagerGateway = new TestingResourceManagerGateway();
+        taskManagerProfilingListHandler =
+                new TaskManagerProfilingListHandler(
+                        () -> CompletableFuture.completedFuture(null),
+                        TestingUtils.TIMEOUT,
+                        Collections.emptyMap(),
+                        TaskManagerProfilingListHeaders.getInstance(),
+                        () -> 
CompletableFuture.completedFuture(resourceManagerGateway));
+        handlerRequest = createRequest(EXPECTED_TASK_MANAGER_ID);
+    }
+
+    @Test
+    void testGetTaskManagerProfilingList() throws Exception {
+        ProfilingInfoList profilingInfoList =
+                new ProfilingInfoList(
+                        Arrays.asList(
+                                ProfilingInfo.create(30, 
ProfilingInfo.ProfilingMode.ITIMER),
+                                ProfilingInfo.create(10, 
ProfilingInfo.ProfilingMode.CPU),
+                                ProfilingInfo.create(15, 
ProfilingInfo.ProfilingMode.ALLOC)));
+        resourceManagerGateway.setRequestProfilingListFunction(
+                EXPECTED_TASK_MANAGER_ID ->
+                        
CompletableFuture.completedFuture(profilingInfoList.getProfilingInfos()));
+        ProfilingInfoList profilingInfoListResp =
+                taskManagerProfilingListHandler
+                        .handleRequest(handlerRequest, resourceManagerGateway)
+                        .get();
+        assertThat(profilingInfoListResp.getProfilingInfos())
+                
.containsExactlyInAnyOrderElementsOf(profilingInfoList.getProfilingInfos());
+    }
+
+    @Test
+    void testGetTaskManagerProfilingListForUnknownTaskExecutorException() 
throws Exception {
+        resourceManagerGateway.setRequestProfilingListFunction(
+                EXPECTED_TASK_MANAGER_ID ->
+                        FutureUtils.completedExceptionally(
+                                new 
UnknownTaskExecutorException(EXPECTED_TASK_MANAGER_ID)));
+        try {
+            taskManagerProfilingListHandler
+                    .handleRequest(handlerRequest, resourceManagerGateway)
+                    .get();
+        } catch (ExecutionException e) {
+            final Throwable cause = e.getCause();
+            assertThat(cause).isInstanceOf(RestHandlerException.class);
+
+            final RestHandlerException restHandlerException = 
(RestHandlerException) cause;
+            assertThat(restHandlerException.getHttpResponseStatus())
+                    .isEqualTo(HttpResponseStatus.NOT_FOUND);
+            assertThat(restHandlerException.getMessage())
+                    .contains("Could not find TaskExecutor " + 
EXPECTED_TASK_MANAGER_ID);
+        }
+    }
+
+    private static HandlerRequest<EmptyRequestBody> createRequest(ResourceID 
taskManagerId)
+            throws HandlerRequestException {
+        Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(TaskManagerIdPathParameter.KEY, 
taskManagerId.toString());
+        Map<String, List<String>> queryParameters = Collections.emptyMap();
+
+        return HandlerRequest.resolveParametersAndCreate(
+                EmptyRequestBody.getInstance(),
+                new TaskManagerMessageParameters(),
+                pathParameters,
+                queryParameters,
+                Collections.emptyList());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 7ea5ee1b08e..765206b6728 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.messages.TaskThreadInfoResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rest.messages.LogInfo;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
 import org.apache.flink.types.SerializableOptional;
@@ -105,6 +106,8 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
 
     private final Supplier<CompletableFuture<ThreadDumpInfo>> 
requestThreadDumpSupplier;
 
+    private final Supplier<CompletableFuture<ProfilingInfo>> 
requestProfilingSupplier;
+
     private final Supplier<CompletableFuture<TaskThreadInfoResponse>>
             requestThreadInfoSamplesSupplier;
 
@@ -153,6 +156,7 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
                             CompletableFuture<Acknowledge>>
                     operatorEventHandler,
             Supplier<CompletableFuture<ThreadDumpInfo>> 
requestThreadDumpSupplier,
+            Supplier<CompletableFuture<ProfilingInfo>> 
requestProfilingSupplier,
             Supplier<CompletableFuture<TaskThreadInfoResponse>> 
requestThreadInfoSamplesSupplier,
             QuadFunction<
                             ExecutionAttemptID,
@@ -182,6 +186,7 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
         this.releaseClusterPartitionsConsumer = 
releaseClusterPartitionsConsumer;
         this.operatorEventHandler = operatorEventHandler;
         this.requestThreadDumpSupplier = requestThreadDumpSupplier;
+        this.requestProfilingSupplier = requestProfilingSupplier;
         this.requestThreadInfoSamplesSupplier = 
requestThreadInfoSamplesSupplier;
         this.triggerCheckpointFunction = triggerCheckpointFunction;
         this.confirmCheckpointFunction = confirmCheckpointFunction;
@@ -352,6 +357,17 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
+    @Override
+    public CompletableFuture<ProfilingInfo> requestProfiling(
+            int duration, ProfilingInfo.ProfilingMode mode, Duration timeout) {
+        return requestProfilingSupplier.get();
+    }
+
+    @Override
+    public CompletableFuture<Collection<ProfilingInfo>> 
requestProfilingList(Duration timeout) {
+        return FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
+    }
+
     @Override
     public String getAddress() {
         return address;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 283a6c9ef35..31f5df12337 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -96,6 +97,9 @@ public class TestingTaskExecutorGatewayBuilder {
                     (a, b, c) -> 
CompletableFuture.completedFuture(Acknowledge.get());
     private static final Supplier<CompletableFuture<ThreadDumpInfo>> 
DEFAULT_THREAD_DUMP_SUPPLIER =
             () -> FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
+
+    private static final Supplier<CompletableFuture<ProfilingInfo>> 
DEFAULT_PROFILING_SUPPLIER =
+            () -> FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
     private static final Supplier<CompletableFuture<TaskThreadInfoResponse>>
             DEFAULT_THREAD_INFO_SAMPLES_SUPPLIER =
                     () -> FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
@@ -151,6 +155,8 @@ public class TestingTaskExecutorGatewayBuilder {
             operatorEventHandler = DEFAULT_OPERATOR_EVENT_HANDLER;
     private Supplier<CompletableFuture<ThreadDumpInfo>> 
requestThreadDumpSupplier =
             DEFAULT_THREAD_DUMP_SUPPLIER;
+    private Supplier<CompletableFuture<ProfilingInfo>> 
requestProfilingSupplier =
+            DEFAULT_PROFILING_SUPPLIER;
 
     private Supplier<CompletableFuture<TaskThreadInfoResponse>> 
requestThreadInfoSamplesSupplier =
             DEFAULT_THREAD_INFO_SAMPLES_SUPPLIER;
@@ -281,6 +287,11 @@ public class TestingTaskExecutorGatewayBuilder {
         this.requestThreadDumpSupplier = requestThreadDumpSupplier;
     }
 
+    public void setRequestProfilingSupplier(
+            Supplier<CompletableFuture<ProfilingInfo>> 
requestProfilingSupplier) {
+        this.requestProfilingSupplier = requestProfilingSupplier;
+    }
+
     public TestingTaskExecutorGatewayBuilder 
setRequestThreadInfoSamplesSupplier(
             Supplier<CompletableFuture<TaskThreadInfoResponse>> 
requestThreadInfoSamplesSupplier) {
         this.requestThreadInfoSamplesSupplier = 
requestThreadInfoSamplesSupplier;
@@ -325,6 +336,7 @@ public class TestingTaskExecutorGatewayBuilder {
                 releaseClusterPartitionsConsumer,
                 operatorEventHandler,
                 requestThreadDumpSupplier,
+                requestProfilingSupplier,
                 requestThreadInfoSamplesSupplier,
                 triggerCheckpointFunction,
                 confirmCheckpointFunction);

Reply via email to