yuchen-ecnu commented on code in PR #23820: URL: https://github.com/apache/flink/pull/23820#discussion_r1411956350
########## flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html: ########## @@ -0,0 +1,108 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<nz-card nzSize="small" style="width: 100%; height: 100%"> + <div style="margin-bottom: 10px"> + <form nz-form [nzLayout]="'inline'"> + <nz-form-item> + <nz-form-label nzFor="duration">Profiling Duration</nz-form-label> + <nz-form-control> + <nz-input-number + [(ngModel)]="duration" + [nzMin]="1" + [nzStep]="30" + [nzFormatter]="formatterDuration" + [nzParser]="parserDuration" + nzPlaceHolder="Duration" + name="duration" + ></nz-input-number> + </nz-form-control> + </nz-form-item> + <nz-form-item> + <nz-form-label nzFor="mode">Profiling Mode</nz-form-label> Review Comment: Okay, I have added the description for each profiling mode with the link of async-profiler wiki. ########## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ProfilingService.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import one.profiler.AsyncProfiler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** Create and keep profiling requests with rolling policy. */ +public class ProfilingService implements Closeable { + + protected static final Logger LOG = LoggerFactory.getLogger(ProfilingService.class); + private static ProfilingService instance; + private final Map<String, ArrayDeque<ProfilingInfo>> profilingMap; + private ScheduledFuture<?> profilingFuture; + private final String profilingResultDir; + private final int historySize; + private final ScheduledExecutorService scheduledExecutor; + + private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH_mm_ss"); + + public static ProfilingService getInstance(Configuration configs) { + if (instance == null) { + instance = new ProfilingService(configs); + } + return instance; + } + + private ProfilingService(Configuration configs) { + this.profilingMap = new HashMap<>(); + this.historySize = configs.getInteger(RestOptions.MAX_PROFILING_HISTORY_SIZE); + Preconditions.checkArgument( + historySize > 0, + String.format( + "Configured %s must be positive.", + RestOptions.MAX_PROFILING_HISTORY_SIZE.key())); + this.profilingResultDir = configs.getString(RestOptions.PROFILING_RESULT_DIR); + this.scheduledExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory.Builder() + .setPoolName("flink-profiling-service") + .build()); + } + + public CompletableFuture<ProfilingInfo> requestProfiling( + String resourceID, long duration, ProfilingInfo.ProfilingMode mode) { + if (profilingFuture != null && !profilingFuture.isDone()) { + return FutureUtils.completedExceptionally( + new IllegalStateException(resourceID + " is still under profiling.")); + } + ProfilingInfo profilingInfo = ProfilingInfo.create(duration, mode); + profilingMap.putIfAbsent(resourceID, new ArrayDeque<>()); + profilingMap.get(resourceID).addFirst(profilingInfo); + AsyncProfiler profiler = AsyncProfiler.getInstance(); + try { + String response = + profiler.execute( + ProfilerConstants.COMMAND_START.msg + + profilingInfo.getProfilingMode().getCode()); + if (StringUtils.isNullOrWhitespaceOnly(response) + || !response.startsWith(ProfilerConstants.PROFILER_STARTED_SUCCESS.msg)) { + return CompletableFuture.completedFuture( + profilingInfo.fail("Start profiler failed. " + response)); + } + } catch (Exception e) { + return CompletableFuture.completedFuture( + profilingInfo.fail("Start profiler failed. " + e)); + } + + this.profilingFuture = + scheduledExecutor.schedule( + () -> stopProfiling(resourceID), duration, TimeUnit.SECONDS); + + return CompletableFuture.completedFuture(profilingInfo); + } + + public String getProfilingResultDir() { + return profilingResultDir; + } + + private void stopProfiling(String resourceID) { + AsyncProfiler profiler = AsyncProfiler.getInstance(); + ArrayDeque<ProfilingInfo> profilingList = profilingMap.get(resourceID); + Preconditions.checkState(!CollectionUtil.isNullOrEmpty(profilingList)); + ProfilingInfo info = profilingList.getFirst(); + try { + String fileName = formatOutputFileName(resourceID); + String outputPath = new File(profilingResultDir, fileName).getPath(); + String response = profiler.execute(ProfilerConstants.COMMAND_STOP.msg + outputPath); + if (!StringUtils.isNullOrWhitespaceOnly(response) + && response.startsWith(ProfilerConstants.PROFILER_STOPPED_SUCCESS.msg)) { + info.success(fileName); + } else { + info.fail("Stop profiler failed. " + response); + } + rollingClearing(profilingList); + } catch (Throwable e) { + info.fail("Stop profiler failed. " + e); + } + } + + private void rollingClearing(ArrayDeque<ProfilingInfo> profilingList) { + while (profilingList.size() > historySize) { + ProfilingInfo info = profilingList.pollLast(); + String outputFile = info.getOutputFile(); + if (StringUtils.isNullOrWhitespaceOnly(outputFile)) { + continue; + } + try { + Files.deleteIfExists(Paths.get(profilingResultDir, outputFile)); + } catch (Exception e) { + LOG.error(String.format("Clearing file for %s failed. Skipped.", info), e); + } + } + } + + private String formatOutputFileName(String resourceID) { + return String.format("%s_%s.html", resourceID, sdf.format(new Date())); Review Comment: Yes, that's true. Added. ########## docs/layouts/shortcodes/generated/expert_rest_section.html: ########## @@ -86,6 +86,30 @@ <td>Long</td> <td>The maximum time in ms for a connection to stay idle before failing.</td> </tr> + <tr> + <td><h5>rest.profiling.dir</h5></td> + <td style="word-wrap: break-word;">"/var/folders/19/05mc16fs0p13z3sj2y0qkzx00000ks/T/"</td> Review Comment: Thanks for the reminder. I think it will be right after adding `@Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")` in `RestOptions#PROFILING_RESULT_DIR` ########## flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ProfilingInfo.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; + +/** Contains information of a Profiling Instance. */ +public class ProfilingInfo implements ResponseBody, Serializable { + private static final long serialVersionUID = 1L; + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_MODE = "mode"; + public static final String FIELD_NAME_TRIGGER_TIME = "triggerTime"; + public static final String FIELD_NAME_FINISHED_TIME = "finishedTime"; + public static final String FIELD_NAME_DURATION = "duration"; + public static final String FIELD_NAME_MESSAGE = "message"; + public static final String FIELD_NAME_OUTPUT_PATH = "file"; Review Comment: Replaced with outputFile for the consistence. ########## flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html: ########## @@ -0,0 +1,108 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<nz-card nzSize="small" style="width: 100%; height: 100%"> + <div style="margin-bottom: 10px"> + <form nz-form [nzLayout]="'inline'"> + <nz-form-item> + <nz-form-label nzFor="duration">Profiling Duration</nz-form-label> + <nz-form-control> + <nz-input-number + [(ngModel)]="duration" + [nzMin]="1" + [nzStep]="30" + [nzFormatter]="formatterDuration" + [nzParser]="parserDuration" + nzPlaceHolder="Duration" + name="duration" + ></nz-input-number> + </nz-form-control> + </nz-form-item> + <nz-form-item> + <nz-form-label nzFor="mode">Profiling Mode</nz-form-label> + <nz-form-control> + <nz-select + [(ngModel)]="selectMode" + nzPlaceHolder="Profiling Mode" + name="mode" + style="width: 200px" + > + <nz-option nzValue="CPU" nzLabel="CPU"></nz-option> + <nz-option nzValue="LOCK" nzLabel="Lock"></nz-option> + <nz-option nzValue="WALL" nzLabel="Wall-Clock"></nz-option> + <nz-option nzValue="ALLOC" nzLabel="Allocation"></nz-option> + <nz-option nzValue="ITIMER" nzLabel="ITIMER"></nz-option> + </nz-select> + </nz-form-control> + </nz-form-item> + <nz-form-item> + <nz-form-control> + <button + nz-button + nzType="primary" + [nzLoading]="isCreating" + [disabled]="duration === null || !isEnabled" + (click)="createProfilingInstance()" + style="margin-left: 10px" + > + Create Profiling Instance + </button> + </nz-form-control> + </nz-form-item> + </form> + <nz-alert + *ngIf="!isEnabled" + nzType="warning" + style="margin-top: 10px" + nzShowIcon + nzMessage="You need to set the config `rest.profiling.enabled=true` to enable this experimental profiler feature." + ></nz-alert> + </div> + <nz-table + [nzSize]="'small'" + [nzData]="profilingList" + [nzLoading]="isLoading" + [nzFrontPagination]="false" + [nzShowPagination]="false" + > + <thead> + <tr> + <th nzWidth="5%">Index</th> + <th nzWidth="15%">Trigger Time</th> + <th nzWidth="15%">Finished Time</th> + <th nzWidth="10%">Profiling Duration</th> + <th nzWidth="5%">Mode</th> + <th nzWidth="10%">Status</th> + <th nzWidth="15%">Message</th> + <th nzWidth="25%">Link</th> + </tr> + </thead> + <tbody> + <tr *ngFor="let info of profilingList; let id = index"> + <td>{{ id }}</td> + <td>{{ info.triggerTime | humanizeWatermarkToDatetime }}</td> + <td>{{ info.finishedTime | humanizeWatermarkToDatetime }}</td> + <td>{{ info.duration }} s</td> + <td>{{ info.mode }}</td> + <td>{{ info.status }}</td> + <td>{{ info.message }}</td> + <td> + <a (click)="downloadProfilingResult(info.file)">{{ info.file }}</a> Review Comment: Okay, message has been filled with 'Profiling Successful' once the profiling instance finished. ########## docs/layouts/shortcodes/generated/rest_configuration.html: ########## @@ -116,6 +116,30 @@ <td>Integer</td> <td>The port that the client connects to. If rest.bind-port has not been specified, then the REST server will bind to this port. Attention: This option is respected only if the high-availability configuration is NONE.</td> </tr> + <tr> + <td><h5>rest.profiling.dir</h5></td> + <td style="word-wrap: break-word;">"/var/folders/19/05mc16fs0p13z3sj2y0qkzx00000ks/T/"</td> Review Comment: Fixed. ########## flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java: ########## @@ -295,4 +295,40 @@ public class RestOptions { .defaultValue(Duration.ofMinutes(5)) .withDescription( "Maximum duration that the result of an async operation is stored. Once elapsed the result of the operation can no longer be retrieved."); + + /** Enables the experimental profiler feature. */ + @Documentation.Section(Documentation.Sections.EXPERT_REST) + public static final ConfigOption<Boolean> ENABLE_PROFILER = + key("rest.profiling.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Enables the experimental profiler feature."); + + /** Maximum history size of profiling list. */ + @Documentation.Section(Documentation.Sections.EXPERT_REST) + public static final ConfigOption<Integer> MAX_PROFILING_HISTORY_SIZE = + key("rest.profiling.history-size") + .intType() + .defaultValue(10) + .withDescription( + "Maximum profiling history instance to be maintained for JobManager or each TaskManager. " + + "The oldest instance will be removed on a rolling basis when the history size exceeds this value."); + + /** Maximum profiling duration for profiling function. */ + @Documentation.Section(Documentation.Sections.EXPERT_REST) + public static final ConfigOption<Integer> MAX_PROFILING_DURATION = + key("rest.profiling.duration-max") + .intType() + .defaultValue(300) + .withDescription( + "Maximum profiling duration for each profiling request. " Review Comment: Yes, I have replaced this option with DurationType. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
