This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9a345e9aa39440cb73e46a9140691a25eeec4a68 Author: Yu Chen <yuchen.e...@gmail.com> AuthorDate: Thu Nov 30 19:58:33 2023 +0800 [FLINK-33434][runtime-web] Support invoke async-profiler on TaskManager via REST API --- .../profiler/job-manager-profiler.component.html | 2 +- .../profiler/task-manager-profiler.component.html} | 22 +-- .../profiler/task-manager-profiler.component.less | 29 +++- .../profiler/task-manager-profiler.component.ts | 153 +++++++++++++++++++++ .../src/app/pages/task-manager/routes.ts | 8 ++ .../status/task-manager-status.component.ts | 3 +- .../src/app/services/task-manager.service.ts | 27 ++++ .../runtime/resourcemanager/ResourceManager.java | 35 +++++ .../resourcemanager/ResourceManagerGateway.java | 28 ++++ .../TaskManagerProfilingFileHandler.java | 82 +++++++++++ .../taskmanager/TaskManagerProfilingHandler.java | 86 ++++++++++++ .../TaskManagerProfilingListHandler.java | 101 ++++++++++++++ .../TaskManagerProfilingFileHeaders.java | 63 +++++++++ .../TaskManagerProfilingFileMessageParameters.java | 40 ++++++ .../taskmanager/TaskManagerProfilingHeaders.java | 79 +++++++++++ .../TaskManagerProfilingListHeaders.java | 80 +++++++++++ .../flink/runtime/taskexecutor/FileType.java | 3 + .../flink/runtime/taskexecutor/TaskExecutor.java | 21 +++ .../runtime/taskexecutor/TaskExecutorGateway.java | 19 +++ .../TaskExecutorGatewayDecoratorBase.java | 12 ++ .../runtime/util/profiler/ProfilingService.java | 9 +- .../runtime/webmonitor/WebMonitorEndpoint.java | 48 ++++++- .../utils/TestingResourceManagerGateway.java | 49 +++++++ .../TaskManagerProfilingHandlerTest.java | 120 ++++++++++++++++ .../TaskManagerProfilingListHandlerTest.java | 126 +++++++++++++++++ .../taskexecutor/TestingTaskExecutorGateway.java | 16 +++ .../TestingTaskExecutorGatewayBuilder.java | 12 ++ 27 files changed, 1237 insertions(+), 36 deletions(-) diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html index ab46841f9c6..9d1e62788f4 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html @@ -69,7 +69,7 @@ nzType="warning" style="margin-top: 10px" nzShowIcon - nzMessage="Please set the config `rest.profiling.enabled=true` to enable this experimental profiler feature." + nzMessage="Please set the config `rest.profiling.enabled: true` to enable this experimental profiler feature." ></nz-alert> </div> <nz-table diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.html similarity index 80% copy from flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html copy to flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.html index ab46841f9c6..6124cd2a374 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.html @@ -37,30 +37,12 @@ nz-button nzType="primary" [nzLoading]="isCreating" - [disabled]="!isEnabled" + [disabled]="duration === null || !isEnabled" (click)="createProfilingInstance()" style="margin-left: 10px" > Create Profiling Instance </button> - <i - class="header-icon" - style="margin-left: 10px" - nz-icon - nz-tooltip - [nzTooltipTitle]="titleTemplate" - nzTooltipTitle="" - nzType="info-circle" - ></i> - <ng-template #titleTemplate> - <span> - Please refer to - <a href="https://github.com/async-profiler/async-profiler/wiki"> - async-profiler's wiki - </a> - for more detailed info of this feature. - </span> - </ng-template> </nz-form-control> </nz-form-item> </form> @@ -69,7 +51,7 @@ nzType="warning" style="margin-top: 10px" nzShowIcon - nzMessage="Please set the config `rest.profiling.enabled=true` to enable this experimental profiler feature." + nzMessage="You need to set the config `rest.profiler.enabled: true` to enable this experimental profiler feature." ></nz-alert> </div> <nz-table diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.less similarity index 70% copy from flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java copy to flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.less index 2e20fa35d12..3b5de2d4f7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.less @@ -16,13 +16,28 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskexecutor; +@import "theme"; -/** Different file types to request from the {@link TaskExecutor}. */ -public enum FileType { - /** The log file type for taskmanager. */ - LOG, +:host { + display: flex; + flex: 1; - /** The stdout file type for taskmanager. */ - STDOUT, + ::ng-deep { + .ant-table-cell { + font-size: @font-size-sm; + } + + ::-webkit-scrollbar { + display: none; + } + + nz-table, + nz-spin, + cdk-virtual-scroll-viewport, + nz-table-inner-scroll, + .ant-spin-container, + .ant-table { + height: 100%; + } + } } diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.ts new file mode 100644 index 00000000000..1bfb50ec137 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.ts @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { CommonModule } from '@angular/common'; +import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core'; +import { FormsModule } from '@angular/forms'; +import { ActivatedRoute } from '@angular/router'; +import { Subject } from 'rxjs'; +import { mergeMap, startWith, takeUntil } from 'rxjs/operators'; + +import { + HumanizeWatermarkPipe, + HumanizeWatermarkToDatetimePipe +} from '@flink-runtime-web/components/humanize-watermark.pipe'; +import { ProfilingDetail } from '@flink-runtime-web/interfaces/job-profiler'; +import { StatusService, TaskManagerService } from '@flink-runtime-web/services'; +import { NzAlertModule } from 'ng-zorro-antd/alert'; +import { NzButtonModule } from 'ng-zorro-antd/button'; +import { NzCardModule } from 'ng-zorro-antd/card'; +import { NzFormModule } from 'ng-zorro-antd/form'; +import { NzInputNumberModule } from 'ng-zorro-antd/input-number'; +import { NzMessageModule, NzMessageService } from 'ng-zorro-antd/message'; +import { NzSelectModule } from 'ng-zorro-antd/select'; +import { NzSpaceModule } from 'ng-zorro-antd/space'; +import { NzTableModule } from 'ng-zorro-antd/table'; + +@Component({ + selector: 'flink-task-manager-profiler', + templateUrl: './task-manager-profiler.component.html', + styleUrls: ['./task-manager-profiler.component.less'], + changeDetection: ChangeDetectionStrategy.OnPush, + imports: [ + NzCardModule, + NzFormModule, + NzInputNumberModule, + HumanizeWatermarkPipe, + FormsModule, + NzButtonModule, + NzAlertModule, + NzTableModule, + NzMessageModule, + CommonModule, + NzSpaceModule, + HumanizeWatermarkToDatetimePipe, + NzSelectModule + ], + standalone: true +}) +export class TaskManagerProfilerComponent implements OnInit, OnDestroy { + private readonly destroy$ = new Subject<void>(); + profilingList: ProfilingDetail[] = []; + isLoading = true; + isCreating = false; + duration = 30; + selectMode = 'CPU'; + isEnabled = false; + formatterDuration = (value: number): string => `${value} s`; + parserDuration = (value: string): string => value.replace(' s', ''); + + constructor( + private taskManagerService: TaskManagerService, + private readonly activatedRoute: ActivatedRoute, + private readonly statusService: StatusService, + private message: NzMessageService, + private cdr: ChangeDetectorRef + ) {} + + public createProfilingInstance(): void { + if (this.profilingList.length > 0 && this.profilingList[0].status === 'RUNNING') { + this.message.warning('Please wait for last profiling finished.'); + return; + } + this.isCreating = true; + const taskManagerId = this.activatedRoute.parent!.snapshot.params.taskManagerId; + this.taskManagerService.createProfilingInstance(taskManagerId, this.selectMode, this.duration).subscribe({ + next: profilingDetail => { + this.profilingList.unshift(profilingDetail); + this.isCreating = false; + this.cdr.markForCheck(); + }, + error: () => { + this.isCreating = false; + this.cdr.markForCheck(); + } + }); + } + + public ngOnInit(): void { + const taskManagerId = this.activatedRoute.parent!.snapshot.params.taskManagerId; + this.statusService.refresh$ + .pipe( + startWith(true), + mergeMap(() => { + this.isLoading = true; + this.cdr.markForCheck(); + return this.taskManagerService.loadProfilingList(taskManagerId); + }), + takeUntil(this.destroy$) + ) + .subscribe({ + next: data => { + this.profilingList = data.profilingList; + this.isLoading = false; + this.isEnabled = true; + this.cdr.markForCheck(); + }, + error: () => { + this.isLoading = false; + this.destroy$.next(); + this.destroy$.complete(); + this.cdr.markForCheck(); + } + }); + } + + public downloadProfilingResult(filePath: string): void { + const taskManagerId = this.activatedRoute.parent!.snapshot.params.taskManagerId; + this.isLoading = true; + this.cdr.markForCheck(); + + this.taskManagerService.loadProfilingResult(taskManagerId, filePath).subscribe({ + next: data => { + const anchor = document.createElement('a'); + anchor.href = data.url; + anchor.download = data.url; + document.body.appendChild(anchor); + anchor.click(); + }, + complete: () => { + this.isLoading = false; + this.cdr.markForCheck(); + } + }); + } + + public ngOnDestroy(): void { + this.destroy$.next(); + this.destroy$.complete(); + } +} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/routes.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/routes.ts index bd72b8fbda7..7c3a9158544 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/routes.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/routes.ts @@ -54,6 +54,14 @@ export const TASK_MANAGER_ROUTES: Routes = [ path: 'thread-dump' } }, + { + path: 'profiler', + loadComponent: () => + import('./profiler/task-manager-profiler.component').then(m => m.TaskManagerProfilerComponent), + data: { + path: 'profiler' + } + }, { path: 'log-list/:logName', loadComponent: () => diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts index a8b98d688c0..04db56f8d10 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts @@ -52,7 +52,8 @@ export class TaskManagerStatusComponent implements OnInit, OnDestroy { { path: 'logs', title: 'Logs' }, { path: 'stdout', title: 'Stdout' }, { path: 'log-list', title: 'Log List' }, - { path: 'thread-dump', title: 'Thread Dump' } + { path: 'thread-dump', title: 'Thread Dump' }, + { path: 'profiler', title: 'Profiler' } ]; public taskManagerDetail?: TaskManagerDetail; public loading = true; diff --git a/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts b/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts index 017538ae957..6e387feceec 100644 --- a/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts +++ b/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts @@ -31,6 +31,7 @@ import { TaskManagersItem, TaskManagerThreadDump } from '@flink-runtime-web/interfaces'; +import { ProfilingDetail, ProfilingList } from '@flink-runtime-web/interfaces/job-profiler'; import { ConfigService } from './config.service'; @@ -119,4 +120,30 @@ export class TaskManagerService { .get<{ url: string }>(`${this.configService.BASE_URL}/jobs/${jobId}/taskmanagers/${taskManagerId}/log-url`) .pipe(map(data => data.url)); } + + loadProfilingList(taskManagerId: string): Observable<ProfilingList> { + return this.httpClient.get<ProfilingList>(`${this.configService.BASE_URL}/taskmanagers/${taskManagerId}/profiler`); + } + + createProfilingInstance(taskManagerId: string, mode: string, duration: number): Observable<ProfilingDetail> { + const requestParam = { mode, duration }; + return this.httpClient.post<ProfilingDetail>( + `${this.configService.BASE_URL}/taskmanagers/${taskManagerId}/profiler`, + requestParam + ); + } + + loadProfilingResult(taskManagerId: string, filePath: string): Observable<Record<string, string>> { + const url = `${this.configService.BASE_URL}/taskmanagers/${taskManagerId}/profiler/${filePath}`; + return this.httpClient + .get(url, { responseType: 'text', headers: new HttpHeaders().append('Cache-Control', 'no-cache') }) + .pipe( + map(data => { + return { + data, + url + }; + }) + ); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 400289aa849..efb0d55f517 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator; import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceEventListener; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rest.messages.LogInfo; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -899,6 +900,40 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> } } + @Override + public CompletableFuture<Collection<ProfilingInfo>> requestTaskManagerProfilingList( + ResourceID taskManagerId, Duration timeout) { + final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId); + if (taskExecutor == null) { + log.debug( + "Requested profiling list from unregistered TaskExecutor {}.", + taskManagerId.getStringWithMetadata()); + return FutureUtils.completedExceptionally( + new UnknownTaskExecutorException(taskManagerId)); + } else { + return taskExecutor.getTaskExecutorGateway().requestProfilingList(timeout); + } + } + + @Override + public CompletableFuture<ProfilingInfo> requestProfiling( + ResourceID taskManagerId, + int duration, + ProfilingInfo.ProfilingMode mode, + Duration timeout) { + final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId); + + if (taskExecutor == null) { + log.debug( + "Requested profiling from unregistered TaskExecutor {}.", + taskManagerId.getStringWithMetadata()); + return FutureUtils.completedExceptionally( + new UnknownTaskExecutorException(taskManagerId)); + } else { + return taskExecutor.getTaskExecutorGateway().requestProfiling(duration, mode, timeout); + } + } + @Override @Local // Bug; see FLINK-27954 public CompletableFuture<TaskExecutorThreadInfoGateway> requestTaskExecutorThreadInfoGateway( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 83bd69dc3d8..9faf8847493 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -37,6 +37,8 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rest.messages.LogInfo; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.runtime.rest.messages.ProfilingInfo.ProfilingMode; import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.rpc.FencedRpcGateway; @@ -288,4 +290,30 @@ public interface ResourceManagerGateway */ CompletableFuture<TaskExecutorThreadInfoGateway> requestTaskExecutorThreadInfoGateway( ResourceID taskManagerId, @RpcTimeout Time timeout); + + /** + * Request profiling list from the given {@link TaskExecutor}. + * + * @param taskManagerId identifying the {@link TaskExecutor} to get profiling list from + * @param timeout for the asynchronous operation + * @return Future which is completed with the historical profiling list + */ + CompletableFuture<Collection<ProfilingInfo>> requestTaskManagerProfilingList( + ResourceID taskManagerId, @RpcTimeout Duration timeout); + + /** + * Requests the profiling instance from the given {@link TaskExecutor}. + * + * @param taskManagerId taskManagerId identifying the {@link TaskExecutor} to get the profiling + * from + * @param duration profiling duration + * @param mode profiling mode {@link ProfilingMode} + * @param timeout timeout of the asynchronous operation + * @return Future containing the created profiling information + */ + CompletableFuture<ProfilingInfo> requestProfiling( + ResourceID taskManagerId, + int duration, + ProfilingInfo.ProfilingMode mode, + @RpcTimeout Duration timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingFileHandler.java new file mode 100644 index 00000000000..ae1b4b5f4a6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingFileHandler.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.blob.TransientBlobKey; +import org.apache.flink.runtime.blob.TransientBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.ProfilingFileNamePathParameter; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingFileMessageParameters; +import org.apache.flink.runtime.taskexecutor.FileType; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Rest handler which serves the profiling result file of the {@link TaskExecutor}. */ +public class TaskManagerProfilingFileHandler + extends AbstractTaskManagerFileHandler<TaskManagerProfilingFileMessageParameters> { + + public TaskManagerProfilingFileHandler( + @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, + @Nonnull Time timeout, + @Nonnull Map<String, String> responseHeaders, + @Nonnull + UntypedResponseMessageHeaders< + EmptyRequestBody, TaskManagerProfilingFileMessageParameters> + untypedResponseMessageHeaders, + @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, + @Nonnull TransientBlobService transientBlobService, + @Nonnull Time cacheEntryDuration) { + super( + leaderRetriever, + timeout, + responseHeaders, + untypedResponseMessageHeaders, + resourceManagerGatewayRetriever, + transientBlobService, + cacheEntryDuration); + } + + @Override + protected CompletableFuture<TransientBlobKey> requestFileUpload( + ResourceManagerGateway resourceManagerGateway, + Tuple2<ResourceID, String> taskManagerIdAndFileName) { + return resourceManagerGateway.requestTaskManagerFileUploadByNameAndType( + taskManagerIdAndFileName.f0, + taskManagerIdAndFileName.f1, + FileType.PROFILER, + getTimeout()); + } + + @Override + protected String getFileName(HandlerRequest<EmptyRequestBody> handlerRequest) { + return handlerRequest.getPathParameter(ProfilingFileNamePathParameter.class); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandler.java new file mode 100644 index 00000000000..b345c9aa135 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandler.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.resourcemanager.AbstractResourceManagerHandler; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.runtime.rest.messages.cluster.ProfilingRequestBody; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.concurrent.FutureUtils; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Rest handler which serves the profiling service from a {@link TaskExecutor}. */ +public class TaskManagerProfilingHandler + extends AbstractResourceManagerHandler< + RestfulGateway, ProfilingRequestBody, ProfilingInfo, TaskManagerMessageParameters> { + private final long maxDurationInSeconds; + + public TaskManagerProfilingHandler( + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders, + MessageHeaders<ProfilingRequestBody, ProfilingInfo, TaskManagerMessageParameters> + messageHeaders, + GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, + final Configuration configuration) { + super( + leaderRetriever, + timeout, + responseHeaders, + messageHeaders, + resourceManagerGatewayRetriever); + this.maxDurationInSeconds = + configuration.get(RestOptions.MAX_PROFILING_DURATION).getSeconds(); + } + + @Override + protected CompletableFuture<ProfilingInfo> handleRequest( + @Nonnull HandlerRequest<ProfilingRequestBody> request, + @Nonnull ResourceManagerGateway gateway) + throws RestHandlerException { + ProfilingRequestBody profilingRequest = request.getRequestBody(); + int duration = profilingRequest.getDuration(); + if (duration <= 0 || duration > maxDurationInSeconds) { + return FutureUtils.completedExceptionally( + new IllegalArgumentException( + String.format( + "`duration` must be set between (0s, %ds].", + maxDurationInSeconds))); + } + final ResourceID taskManagerId = request.getPathParameter(TaskManagerIdPathParameter.class); + return gateway.requestProfiling( + taskManagerId, duration, profilingRequest.getMode(), getTimeout()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandler.java new file mode 100644 index 00000000000..7528d1a9b7b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandler.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.resourcemanager.AbstractResourceManagerHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.runtime.rest.messages.ProfilingInfoList; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** Handler which serves detailed TaskManager profiling list information. */ +public class TaskManagerProfilingListHandler + extends AbstractResourceManagerHandler< + RestfulGateway, EmptyRequestBody, ProfilingInfoList, TaskManagerMessageParameters> { + + private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever; + + public TaskManagerProfilingListHandler( + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders, + MessageHeaders<EmptyRequestBody, ProfilingInfoList, TaskManagerMessageParameters> + messageHeaders, + GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) { + super( + leaderRetriever, + timeout, + responseHeaders, + messageHeaders, + resourceManagerGatewayRetriever); + + this.resourceManagerGatewayRetriever = + Preconditions.checkNotNull(resourceManagerGatewayRetriever); + } + + @Override + protected CompletableFuture<ProfilingInfoList> handleRequest( + @Nonnull HandlerRequest<EmptyRequestBody> request, + @Nonnull ResourceManagerGateway gateway) + throws RestHandlerException { + final ResourceID taskManagerId = request.getPathParameter(TaskManagerIdPathParameter.class); + final ResourceManagerGateway resourceManagerGateway = + getResourceManagerGateway(resourceManagerGatewayRetriever); + final CompletableFuture<Collection<ProfilingInfo>> profilingListFuture = + resourceManagerGateway.requestTaskManagerProfilingList(taskManagerId, getTimeout()); + + return profilingListFuture + .thenApply(ProfilingInfoList::new) + .exceptionally( + (throwable) -> { + final Throwable strippedThrowable = + ExceptionUtils.stripCompletionException(throwable); + if (strippedThrowable instanceof UnknownTaskExecutorException) { + throw new CompletionException( + new RestHandlerException( + "Could not find TaskExecutor " + taskManagerId, + HttpResponseStatus.NOT_FOUND, + strippedThrowable)); + } else { + throw new CompletionException(throwable); + } + }); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileHeaders.java new file mode 100644 index 00000000000..97b1b3b4971 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileHeaders.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.taskmanager; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingFileHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.LogFileNamePathParameter; +import org.apache.flink.runtime.rest.messages.RuntimeUntypedResponseMessageHeaders; + +/** Headers for the {@link TaskManagerProfilingFileHandler}. */ +public class TaskManagerProfilingFileHeaders + implements RuntimeUntypedResponseMessageHeaders< + EmptyRequestBody, TaskManagerProfilingFileMessageParameters> { + + private static final TaskManagerProfilingFileHeaders INSTANCE = + new TaskManagerProfilingFileHeaders(); + + private static final String URL = + String.format( + "/taskmanagers/:%s/profiler/:%s", + TaskManagerIdPathParameter.KEY, LogFileNamePathParameter.KEY); + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public TaskManagerProfilingFileMessageParameters getUnresolvedMessageParameters() { + return new TaskManagerProfilingFileMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static TaskManagerProfilingFileHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileMessageParameters.java new file mode 100644 index 00000000000..6f064543d25 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingFileMessageParameters.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.taskmanager; + +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingFileHandler; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.ProfilingFileNamePathParameter; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** Parameters for {@link TaskManagerProfilingFileHandler}. */ +public class TaskManagerProfilingFileMessageParameters extends TaskManagerMessageParameters { + + public final ProfilingFileNamePathParameter profilingFileNamePathParameter = + new ProfilingFileNamePathParameter(); + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Collections.unmodifiableCollection( + Arrays.asList(profilingFileNamePathParameter, taskManagerIdParameter)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingHeaders.java new file mode 100644 index 00000000000..bab34f834d5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingHeaders.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.taskmanager; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingHandler; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; +import org.apache.flink.runtime.rest.messages.cluster.ProfilingRequestBody; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Headers for the {@link TaskManagerProfilingHandler}. */ +public class TaskManagerProfilingHeaders + implements RuntimeMessageHeaders< + ProfilingRequestBody, ProfilingInfo, TaskManagerMessageParameters> { + + private static final TaskManagerProfilingHeaders INSTANCE = new TaskManagerProfilingHeaders(); + + private static final String URL = + String.format("/taskmanagers/:%s/profiler", TaskManagerIdPathParameter.KEY); + + private TaskManagerProfilingHeaders() {} + + @Override + public Class<ProfilingRequestBody> getRequestClass() { + return ProfilingRequestBody.class; + } + + @Override + public TaskManagerMessageParameters getUnresolvedMessageParameters() { + return new TaskManagerMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.POST; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static TaskManagerProfilingHeaders getInstance() { + return INSTANCE; + } + + @Override + public Class<ProfilingInfo> getResponseClass() { + return ProfilingInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Returns the profiling instance of the requested TaskManager."; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingListHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingListHeaders.java new file mode 100644 index 00000000000..6f8267ee755 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerProfilingListHeaders.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.taskmanager; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingListHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.ProfilingInfoList; +import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Headers for the {@link TaskManagerProfilingListHandler}. */ +public class TaskManagerProfilingListHeaders + implements RuntimeMessageHeaders< + EmptyRequestBody, ProfilingInfoList, TaskManagerMessageParameters> { + + private static final TaskManagerProfilingListHeaders INSTANCE = + new TaskManagerProfilingListHeaders(); + + private static final String URL = + String.format("/taskmanagers/:%s/profiler", TaskManagerIdPathParameter.KEY); + + private TaskManagerProfilingListHeaders() {} + + public static TaskManagerProfilingListHeaders getInstance() { + return INSTANCE; + } + + @Override + public Class<ProfilingInfoList> getResponseClass() { + return ProfilingInfoList.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Returns the list of profiling result files on a TaskManager."; + } + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public TaskManagerMessageParameters getUnresolvedMessageParameters() { + return new TaskManagerMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java index 2e20fa35d12..abb1505c40b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java @@ -25,4 +25,7 @@ public enum FileType { /** The stdout file type for taskmanager. */ STDOUT, + + /** The profiler file type for taskmanager. */ + PROFILER, } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index da753f4b8e4..8ac63f602af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -92,6 +92,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration; import org.apache.flink.runtime.rest.messages.LogInfo; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; @@ -134,6 +135,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; import org.apache.flink.runtime.util.GroupCache; +import org.apache.flink.runtime.util.profiler.ProfilingService; import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest; import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.CollectionUtil; @@ -305,6 +307,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> shuffleDescriptorsCache; + private final ProfilingService profilingService; + public TaskExecutor( RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, @@ -375,6 +379,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { ScheduledExecutorService sampleExecutor = Executors.newSingleThreadScheduledExecutor(sampleThreadFactory); this.threadInfoSampleService = new ThreadInfoSampleService(sampleExecutor); + this.profilingService = + ProfilingService.getInstance(taskManagerConfiguration.getConfiguration()); this.slotAllocationSnapshotPersistenceService = taskExecutorServices.getSlotAllocationSnapshotPersistenceService(); @@ -1304,6 +1310,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { case LOG: baseDir = taskManagerConfiguration.getTaskManagerLogDir(); break; + case PROFILER: + baseDir = profilingService.getProfilingResultDir(); + break; default: baseDir = null; } @@ -1411,6 +1420,18 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } } + @Override + public CompletableFuture<ProfilingInfo> requestProfiling( + int duration, ProfilingInfo.ProfilingMode mode, Duration timeout) { + return profilingService.requestProfiling( + getResourceID().getResourceIdString(), duration, mode); + } + + @Override + public CompletableFuture<Collection<ProfilingInfo>> requestProfilingList(Duration timeout) { + return profilingService.getProfilingList(getResourceID().getResourceIdString()); + } + // ------------------------------------------------------------------------ // Internal resource manager connection methods // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index c70e817cb33..d3e9dd119a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rest.messages.LogInfo; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; @@ -314,4 +315,22 @@ public interface TaskExecutorGateway */ CompletableFuture<Acknowledge> updateDelegationTokens( ResourceManagerId resourceManagerId, byte[] tokens); + + /** + * Requests the profiling from this TaskManager. + * + * @param duration profiling duration + * @param mode profiling mode {@link ProfilingInfo.ProfilingMode} + * @param timeout timeout for the asynchronous operation + * @return the {@link ProfilingInfo} for this TaskManager. + */ + CompletableFuture<ProfilingInfo> requestProfiling( + int duration, ProfilingInfo.ProfilingMode mode, @RpcTimeout Duration timeout); + + /** + * Requests for the historical profiling file names on the TaskManager. + * + * @return A Collection with all profiling instances information. + */ + CompletableFuture<Collection<ProfilingInfo>> requestProfilingList(@RpcTimeout Duration timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java index e0e8ff7af48..bd180c0de73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.messages.TaskThreadInfoResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rest.messages.LogInfo; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest; import org.apache.flink.types.SerializableOptional; @@ -245,6 +246,17 @@ public class TaskExecutorGatewayDecoratorBase implements TaskExecutorGateway { return originalGateway.updateDelegationTokens(resourceManagerId, tokens); } + @Override + public CompletableFuture<ProfilingInfo> requestProfiling( + int duration, ProfilingInfo.ProfilingMode mode, Duration timeout) { + return originalGateway.requestProfiling(duration, mode, timeout); + } + + @Override + public CompletableFuture<Collection<ProfilingInfo>> requestProfilingList(Duration timeout) { + return originalGateway.requestProfilingList(timeout); + } + @Override public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples( Collection<ExecutionAttemptID> taskExecutionAttemptIds, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java index d8a67339810..5af2dfd6336 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java @@ -179,6 +179,10 @@ public class ProfilingService implements Closeable { profilingMap.getOrDefault(resourceID, new ArrayDeque<>())); } + public String getProfilingResultDir() { + return profilingResultDir; + } + @VisibleForTesting ArrayDeque<ProfilingInfo> getProfilingListForTest(String resourceID) { return profilingMap.getOrDefault(resourceID, new ArrayDeque<>()); @@ -194,11 +198,6 @@ public class ProfilingService implements Closeable { return profilingFuture; } - @VisibleForTesting - public String getProfilingResultDir() { - return profilingResultDir; - } - enum ProfilerConstants { PROFILER_STARTED_SUCCESS("Profiling started"), PROFILER_STOPPED_SUCCESS("OK"), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 42fd92f47e5..95f142b4b15 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -101,6 +101,9 @@ import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerCustomLogHan import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogListHandler; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingFileHandler; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingHandler; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerProfilingListHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler; @@ -152,6 +155,9 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHe import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogsHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingFileHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingListHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerThreadDumpHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; @@ -964,6 +970,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp handlers.add( Tuple2.of(JobManagerThreadDumpHeaders.getInstance(), jobManagerThreadDumpHandler)); + final Time cacheEntryDuration = Time.milliseconds(restConfiguration.getRefreshInterval()); + // load profiler relative handlers if (clusterConfiguration.get(RestOptions.ENABLE_PROFILER)) { final JobManagerProfilingHandler jobManagerProfilingHandler = @@ -987,6 +995,32 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp responseHeaders, JobManagerProfilingFileHeaders.getInstance(), clusterConfiguration); + + final TaskManagerProfilingHandler taskManagerProfilingHandler = + new TaskManagerProfilingHandler( + leaderRetriever, + timeout, + responseHeaders, + TaskManagerProfilingHeaders.getInstance(), + resourceManagerRetriever, + clusterConfiguration); + + final TaskManagerProfilingListHandler taskManagerProfilingListHandler = + new TaskManagerProfilingListHandler( + leaderRetriever, + timeout, + responseHeaders, + TaskManagerProfilingListHeaders.getInstance(), + resourceManagerRetriever); + final TaskManagerProfilingFileHandler taskManagerProfilingFileHandler = + new TaskManagerProfilingFileHandler( + leaderRetriever, + timeout, + responseHeaders, + TaskManagerProfilingFileHeaders.getInstance(), + resourceManagerRetriever, + transientBlobService, + cacheEntryDuration); handlers.add( Tuple2.of( JobManagerProfilingHeaders.getInstance(), jobManagerProfilingHandler)); @@ -998,12 +1032,22 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp Tuple2.of( JobManagerProfilingFileHeaders.getInstance(), jobManagerProfilingFileHandler)); + handlers.add( + Tuple2.of( + TaskManagerProfilingHeaders.getInstance(), + taskManagerProfilingHandler)); + handlers.add( + Tuple2.of( + TaskManagerProfilingListHeaders.getInstance(), + taskManagerProfilingListHandler)); + handlers.add( + Tuple2.of( + TaskManagerProfilingFileHeaders.getInstance(), + taskManagerProfilingFileHandler)); } // TaskManager log and stdout file handler - final Time cacheEntryDuration = Time.milliseconds(restConfiguration.getRefreshInterval()); - final TaskManagerLogFileHandler taskManagerLogFileHandler = new TaskManagerLogFileHandler( leaderRetriever, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index 91e13eb34f5..088fed278bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration; import org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots; import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException; import org.apache.flink.runtime.rest.messages.LogInfo; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; @@ -123,6 +124,12 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { private volatile Function<ResourceID, CompletableFuture<ThreadDumpInfo>> requestThreadDumpFunction; + private volatile Function<ResourceID, CompletableFuture<ProfilingInfo>> + requestProfilingFunction; + + private volatile Function<ResourceID, CompletableFuture<Collection<ProfilingInfo>>> + requestProfilingListFunction; + private volatile BiFunction<JobMasterId, ResourceRequirements, CompletableFuture<Acknowledge>> declareRequiredResourcesFunction = (ignoredA, ignoredB) -> @@ -249,6 +256,17 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { this.requestThreadDumpFunction = requestThreadDumpFunction; } + public void setRequestProfilingFunction( + Function<ResourceID, CompletableFuture<ProfilingInfo>> requestProfilingFunction) { + this.requestProfilingFunction = requestProfilingFunction; + } + + public void setRequestProfilingListFunction( + Function<ResourceID, CompletableFuture<Collection<ProfilingInfo>>> + requestProfilingListFunction) { + this.requestProfilingListFunction = requestProfilingListFunction; + } + public void setDeclareRequiredResourcesFunction( BiFunction<JobMasterId, ResourceRequirements, CompletableFuture<Acknowledge>> declareRequiredResourcesFunction) { @@ -507,6 +525,37 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { } } + @Override + public CompletableFuture<Collection<ProfilingInfo>> requestTaskManagerProfilingList( + ResourceID taskManagerId, Duration timeout) { + final Function<ResourceID, CompletableFuture<Collection<ProfilingInfo>>> function = + this.requestProfilingListFunction; + + if (function != null) { + return function.apply(taskManagerId); + } else { + return FutureUtils.completedExceptionally( + new UnknownTaskExecutorException(taskManagerId)); + } + } + + @Override + public CompletableFuture<ProfilingInfo> requestProfiling( + ResourceID taskManagerId, + int duration, + ProfilingInfo.ProfilingMode mode, + Duration timeout) { + final Function<ResourceID, CompletableFuture<ProfilingInfo>> function = + this.requestProfilingFunction; + + if (function != null) { + return function.apply(taskManagerId); + } else { + return FutureUtils.completedExceptionally( + new UnknownTaskExecutorException(taskManagerId)); + } + } + @Override public ResourceManagerId getFencingToken() { return resourceManagerId; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandlerTest.java new file mode 100644 index 00000000000..b187ad8097b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingHandlerTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.runtime.rest.messages.cluster.ProfilingRequestBody; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingHeaders; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the {@link TaskManagerProfilingHandler}. */ +class TaskManagerProfilingHandlerTest { + + private static final ResourceID EXPECTED_TASK_MANAGER_ID = ResourceID.generate(); + private TestingResourceManagerGateway resourceManagerGateway; + private TaskManagerProfilingHandler taskManagerProfilingHandler; + private HandlerRequest<ProfilingRequestBody> handlerRequest; + + @BeforeEach + void setUp(@TempDir Path tempDir) throws HandlerRequestException { + Configuration clusterConfiguration = new Configuration(); + clusterConfiguration.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, 3); + clusterConfiguration.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString()); + resourceManagerGateway = new TestingResourceManagerGateway(); + taskManagerProfilingHandler = + new TaskManagerProfilingHandler( + () -> CompletableFuture.completedFuture(null), + TestingUtils.TIMEOUT, + Collections.emptyMap(), + TaskManagerProfilingHeaders.getInstance(), + () -> CompletableFuture.completedFuture(resourceManagerGateway), + clusterConfiguration); + handlerRequest = createRequest(EXPECTED_TASK_MANAGER_ID); + } + + @Test + void testGetTaskManagerProfiling() throws Exception { + ProfilingInfo profilingInfo = ProfilingInfo.create(30, ProfilingInfo.ProfilingMode.ITIMER); + resourceManagerGateway.setRequestProfilingFunction( + EXPECTED_TASK_MANAGER_ID -> CompletableFuture.completedFuture(profilingInfo)); + ProfilingInfo profilingInfoResp = + taskManagerProfilingHandler + .handleRequest(handlerRequest, resourceManagerGateway) + .get(); + assertThat(profilingInfoResp).isEqualTo(profilingInfo); + } + + @Test + void testGetTaskManagerProfilingForUnknownTaskExecutorException() throws Exception { + resourceManagerGateway.setRequestProfilingListFunction( + EXPECTED_TASK_MANAGER_ID -> + FutureUtils.completedExceptionally( + new UnknownTaskExecutorException(EXPECTED_TASK_MANAGER_ID))); + try { + taskManagerProfilingHandler.handleRequest(handlerRequest, resourceManagerGateway).get(); + } catch (ExecutionException e) { + final Throwable cause = e.getCause(); + assertThat(cause).isInstanceOf(UnknownTaskExecutorException.class); + + final UnknownTaskExecutorException unknownTaskExecutorException = + (UnknownTaskExecutorException) cause; + assertThat(unknownTaskExecutorException.getMessage()) + .contains("No TaskExecutor registered under " + EXPECTED_TASK_MANAGER_ID); + } + } + + private static HandlerRequest<ProfilingRequestBody> createRequest(ResourceID taskManagerId) + throws HandlerRequestException { + Map<String, String> pathParameters = new HashMap<>(); + pathParameters.put(TaskManagerIdPathParameter.KEY, taskManagerId.toString()); + Map<String, List<String>> queryParameters = Collections.emptyMap(); + ProfilingRequestBody requestBody = + new ProfilingRequestBody(ProfilingInfo.ProfilingMode.ITIMER, 10); + return HandlerRequest.resolveParametersAndCreate( + requestBody, + new TaskManagerMessageParameters(), + pathParameters, + queryParameters, + Collections.emptyList()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandlerTest.java new file mode 100644 index 00000000000..ab99d3ee513 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingListHandlerTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.runtime.rest.messages.ProfilingInfoList; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingListHeaders; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the {@link TaskManagerProfilingListHandler}. */ +class TaskManagerProfilingListHandlerTest { + + private static final ResourceID EXPECTED_TASK_MANAGER_ID = ResourceID.generate(); + private TestingResourceManagerGateway resourceManagerGateway; + private TaskManagerProfilingListHandler taskManagerProfilingListHandler; + private HandlerRequest<EmptyRequestBody> handlerRequest; + + @BeforeEach + void setUp() throws HandlerRequestException { + resourceManagerGateway = new TestingResourceManagerGateway(); + taskManagerProfilingListHandler = + new TaskManagerProfilingListHandler( + () -> CompletableFuture.completedFuture(null), + TestingUtils.TIMEOUT, + Collections.emptyMap(), + TaskManagerProfilingListHeaders.getInstance(), + () -> CompletableFuture.completedFuture(resourceManagerGateway)); + handlerRequest = createRequest(EXPECTED_TASK_MANAGER_ID); + } + + @Test + void testGetTaskManagerProfilingList() throws Exception { + ProfilingInfoList profilingInfoList = + new ProfilingInfoList( + Arrays.asList( + ProfilingInfo.create(30, ProfilingInfo.ProfilingMode.ITIMER), + ProfilingInfo.create(10, ProfilingInfo.ProfilingMode.CPU), + ProfilingInfo.create(15, ProfilingInfo.ProfilingMode.ALLOC))); + resourceManagerGateway.setRequestProfilingListFunction( + EXPECTED_TASK_MANAGER_ID -> + CompletableFuture.completedFuture(profilingInfoList.getProfilingInfos())); + ProfilingInfoList profilingInfoListResp = + taskManagerProfilingListHandler + .handleRequest(handlerRequest, resourceManagerGateway) + .get(); + assertThat(profilingInfoListResp.getProfilingInfos()) + .containsExactlyInAnyOrderElementsOf(profilingInfoList.getProfilingInfos()); + } + + @Test + void testGetTaskManagerProfilingListForUnknownTaskExecutorException() throws Exception { + resourceManagerGateway.setRequestProfilingListFunction( + EXPECTED_TASK_MANAGER_ID -> + FutureUtils.completedExceptionally( + new UnknownTaskExecutorException(EXPECTED_TASK_MANAGER_ID))); + try { + taskManagerProfilingListHandler + .handleRequest(handlerRequest, resourceManagerGateway) + .get(); + } catch (ExecutionException e) { + final Throwable cause = e.getCause(); + assertThat(cause).isInstanceOf(RestHandlerException.class); + + final RestHandlerException restHandlerException = (RestHandlerException) cause; + assertThat(restHandlerException.getHttpResponseStatus()) + .isEqualTo(HttpResponseStatus.NOT_FOUND); + assertThat(restHandlerException.getMessage()) + .contains("Could not find TaskExecutor " + EXPECTED_TASK_MANAGER_ID); + } + } + + private static HandlerRequest<EmptyRequestBody> createRequest(ResourceID taskManagerId) + throws HandlerRequestException { + Map<String, String> pathParameters = new HashMap<>(); + pathParameters.put(TaskManagerIdPathParameter.KEY, taskManagerId.toString()); + Map<String, List<String>> queryParameters = Collections.emptyMap(); + + return HandlerRequest.resolveParametersAndCreate( + EmptyRequestBody.getInstance(), + new TaskManagerMessageParameters(), + pathParameters, + queryParameters, + Collections.emptyList()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index 7ea5ee1b08e..765206b6728 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.messages.TaskThreadInfoResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rest.messages.LogInfo; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest; import org.apache.flink.types.SerializableOptional; @@ -105,6 +106,8 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { private final Supplier<CompletableFuture<ThreadDumpInfo>> requestThreadDumpSupplier; + private final Supplier<CompletableFuture<ProfilingInfo>> requestProfilingSupplier; + private final Supplier<CompletableFuture<TaskThreadInfoResponse>> requestThreadInfoSamplesSupplier; @@ -153,6 +156,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { CompletableFuture<Acknowledge>> operatorEventHandler, Supplier<CompletableFuture<ThreadDumpInfo>> requestThreadDumpSupplier, + Supplier<CompletableFuture<ProfilingInfo>> requestProfilingSupplier, Supplier<CompletableFuture<TaskThreadInfoResponse>> requestThreadInfoSamplesSupplier, QuadFunction< ExecutionAttemptID, @@ -182,6 +186,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { this.releaseClusterPartitionsConsumer = releaseClusterPartitionsConsumer; this.operatorEventHandler = operatorEventHandler; this.requestThreadDumpSupplier = requestThreadDumpSupplier; + this.requestProfilingSupplier = requestProfilingSupplier; this.requestThreadInfoSamplesSupplier = requestThreadInfoSamplesSupplier; this.triggerCheckpointFunction = triggerCheckpointFunction; this.confirmCheckpointFunction = confirmCheckpointFunction; @@ -352,6 +357,17 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { return CompletableFuture.completedFuture(Acknowledge.get()); } + @Override + public CompletableFuture<ProfilingInfo> requestProfiling( + int duration, ProfilingInfo.ProfilingMode mode, Duration timeout) { + return requestProfilingSupplier.get(); + } + + @Override + public CompletableFuture<Collection<ProfilingInfo>> requestProfilingList(Duration timeout) { + return FutureUtils.completedExceptionally(new UnsupportedOperationException()); + } + @Override public String getAddress() { return address; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java index 283a6c9ef35..31f5df12337 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.TaskThreadInfoResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; @@ -96,6 +97,9 @@ public class TestingTaskExecutorGatewayBuilder { (a, b, c) -> CompletableFuture.completedFuture(Acknowledge.get()); private static final Supplier<CompletableFuture<ThreadDumpInfo>> DEFAULT_THREAD_DUMP_SUPPLIER = () -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); + + private static final Supplier<CompletableFuture<ProfilingInfo>> DEFAULT_PROFILING_SUPPLIER = + () -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); private static final Supplier<CompletableFuture<TaskThreadInfoResponse>> DEFAULT_THREAD_INFO_SAMPLES_SUPPLIER = () -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); @@ -151,6 +155,8 @@ public class TestingTaskExecutorGatewayBuilder { operatorEventHandler = DEFAULT_OPERATOR_EVENT_HANDLER; private Supplier<CompletableFuture<ThreadDumpInfo>> requestThreadDumpSupplier = DEFAULT_THREAD_DUMP_SUPPLIER; + private Supplier<CompletableFuture<ProfilingInfo>> requestProfilingSupplier = + DEFAULT_PROFILING_SUPPLIER; private Supplier<CompletableFuture<TaskThreadInfoResponse>> requestThreadInfoSamplesSupplier = DEFAULT_THREAD_INFO_SAMPLES_SUPPLIER; @@ -281,6 +287,11 @@ public class TestingTaskExecutorGatewayBuilder { this.requestThreadDumpSupplier = requestThreadDumpSupplier; } + public void setRequestProfilingSupplier( + Supplier<CompletableFuture<ProfilingInfo>> requestProfilingSupplier) { + this.requestProfilingSupplier = requestProfilingSupplier; + } + public TestingTaskExecutorGatewayBuilder setRequestThreadInfoSamplesSupplier( Supplier<CompletableFuture<TaskThreadInfoResponse>> requestThreadInfoSamplesSupplier) { this.requestThreadInfoSamplesSupplier = requestThreadInfoSamplesSupplier; @@ -325,6 +336,7 @@ public class TestingTaskExecutorGatewayBuilder { releaseClusterPartitionsConsumer, operatorEventHandler, requestThreadDumpSupplier, + requestProfilingSupplier, requestThreadInfoSamplesSupplier, triggerCheckpointFunction, confirmCheckpointFunction);