Myasuka commented on code in PR #23347: URL: https://github.com/apache/flink/pull/23347#discussion_r1349497297
########## flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointExpiredThreadDumper.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.util.JvmUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +/** + * A worker that can print thread dump to log when checkpoint aborted. There is a throttling + * strategy that it can only output once for each job and each checkpoint abortion. + */ +public class CheckpointExpiredThreadDumper { + + private static final Logger LOG = LoggerFactory.getLogger(CheckpointExpiredThreadDumper.class); + + /** + * When a checkpoint is notified timeout, the task manager could produce a thread dump to log + * for debugging purpose. This enum defines the log level of this thread dump. + */ + public enum ThreadDumpLogLevel { + ERROR, + WARN, + INFO, + DEBUG, + TRACE, + OFF + } + + /** A map records the last dumped checkpoint for each job. */ + private final Map<JobID, Long> jobIdLastDumpCheckpointIdMap; + + public CheckpointExpiredThreadDumper() { + this.jobIdLastDumpCheckpointIdMap = new HashMap<>(); + } + + /** + * Print the thread dump to log if needed. A thread dump is captured and print only when: + * + * <p>1. It is the first time to request a thread dump for this checkpoint within current job. + * + * <p>2. The configured log level is enabled. + * + * @param jobId id of the current job. + * @param checkpointId id of the aborted checkpoint. + * @param threadDumpLogLevelWhenAbort the configured log level for thread dump. + * @param maxDepthOfThreadDump the max stack depth for thread dump. + * @return true if really print the log. + */ + public boolean threadDumpIfNeeded( + JobID jobId, + long checkpointId, + ThreadDumpLogLevel threadDumpLogLevelWhenAbort, + int maxDepthOfThreadDump) { + synchronized (this) { + long lastCheckpointId = jobIdLastDumpCheckpointIdMap.computeIfAbsent(jobId, (k) -> -1L); + if (lastCheckpointId >= checkpointId) { + return false; + } + jobIdLastDumpCheckpointIdMap.put(jobId, checkpointId); + } Review Comment: I think we can use a ConcurrentHashMap of `jobIdLastDumpCheckpointIdMap` here to avoid expensive `synchronized (this)`. How about code below: ~~~java AtomicBoolean needDump = new AtomicBoolean(false); jobIdLastDumpCheckpointIdMap.compute( jobId, (k, v) -> { long lastCheckpointId = v == null ? -1L: v; needDump.set(lastCheckpointId < checkpointId); return Math.max(lastCheckpointId, checkpointId); }); if (needDump.get()) { ...... } ~~~ ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java: ########## @@ -101,6 +101,14 @@ void notifyCheckpointSubsumed( /** Waits for all the pending checkpoints to finish their asynchronous step. */ void waitForPendingCheckpoints() throws Exception; + /** + * Check whether a checkpoint is registered (running) in this coordinator. + * + * @param checkpointId of the target checkpoint + * @return true if the checkpoint is running. + */ + boolean checkCheckpointRegistered(long checkpointId); Review Comment: I think `check-check` looks a bit weird, how about `isCheckpointRegistered`? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ########## @@ -1316,6 +1334,24 @@ public void abortCheckpointOnBarrier(long checkpointId, CheckpointException caus subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain); } + @Override + public Future<Void> threadDumpOnCheckpointTimeout(long checkpointId) { + if (checkpointExpiredThreadDumper != null) { + return notifyCheckpointOperation( + () -> { + if (isRunning() + && subtaskCheckpointCoordinator.checkCheckpointRegistered( + checkpointId)) { + checkpointExpiredThreadDumper.threadDumpIfNeeded(checkpointId); + } + }, + "Request thread dump when checkpoint timeout"); + } + CompletableFuture<Void> result = new CompletableFuture<>(); + result.complete(null); + return result; Review Comment: `return FutureUtils.completedVoidFuture()` is better here to avoid creating unnecessary objects. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java: ########## @@ -431,4 +432,9 @@ public void setCheckpointResponder(CheckpointResponder checkpointResponder) { public ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory() { return channelStateExecutorFactory; } + + @Override + public CheckpointExpiredThreadDumper getCheckpointExpiredThreadDumper() { + return null; Review Comment: I prefer to use `Optional` here due to Flink's [code-style](https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-optional) ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java: ########## @@ -314,4 +317,28 @@ public class ExecutionCheckpointingOptions { "Defines the maximum number of subtasks that share the same channel state file. " + "It can reduce the number of small files when enable unaligned checkpoint. " + "Each subtask will create a new channel state file when this is configured to 1."); + + public static final ConfigOption<ThreadDumpLogLevel> + THREAD_DUMP_LOG_LEVEL_WHEN_CHECKPOINT_TIMEOUT = + key("execution.checkpointing.thread-dump-log-level-when-checkpoint-timeout") Review Comment: I think printing the thread dump info is not only useful when checkpoint expires, we can also enable it if job is in high back-pressured. Thus, I suggest to split this parameter into two parameters. One is for `thread-dump-log-level`, and other is for the scenarios. Such as we can print the thread dump info when checkpoint expires, task is running so busy, task is failing or the process will exit for fatal error. We can only support this feature for expired checkpoint in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org