Myasuka commented on code in PR #23347:
URL: https://github.com/apache/flink/pull/23347#discussion_r1349497297


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointExpiredThreadDumper.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.util.JvmUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * A worker that can print thread dump to log when checkpoint aborted. There 
is a throttling
+ * strategy that it can only output once for each job and each checkpoint 
abortion.
+ */
+public class CheckpointExpiredThreadDumper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointExpiredThreadDumper.class);
+
+    /**
+     * When a checkpoint is notified timeout, the task manager could produce a 
thread dump to log
+     * for debugging purpose. This enum defines the log level of this thread 
dump.
+     */
+    public enum ThreadDumpLogLevel {
+        ERROR,
+        WARN,
+        INFO,
+        DEBUG,
+        TRACE,
+        OFF
+    }
+
+    /** A map records the last dumped checkpoint for each job. */
+    private final Map<JobID, Long> jobIdLastDumpCheckpointIdMap;
+
+    public CheckpointExpiredThreadDumper() {
+        this.jobIdLastDumpCheckpointIdMap = new HashMap<>();
+    }
+
+    /**
+     * Print the thread dump to log if needed. A thread dump is captured and 
print only when:
+     *
+     * <p>1. It is the first time to request a thread dump for this checkpoint 
within current job.
+     *
+     * <p>2. The configured log level is enabled.
+     *
+     * @param jobId id of the current job.
+     * @param checkpointId id of the aborted checkpoint.
+     * @param threadDumpLogLevelWhenAbort the configured log level for thread 
dump.
+     * @param maxDepthOfThreadDump the max stack depth for thread dump.
+     * @return true if really print the log.
+     */
+    public boolean threadDumpIfNeeded(
+            JobID jobId,
+            long checkpointId,
+            ThreadDumpLogLevel threadDumpLogLevelWhenAbort,
+            int maxDepthOfThreadDump) {
+        synchronized (this) {
+            long lastCheckpointId = 
jobIdLastDumpCheckpointIdMap.computeIfAbsent(jobId, (k) -> -1L);
+            if (lastCheckpointId >= checkpointId) {
+                return false;
+            }
+            jobIdLastDumpCheckpointIdMap.put(jobId, checkpointId);
+        }

Review Comment:
   I think we can use a ConcurrentHashMap of `jobIdLastDumpCheckpointIdMap` 
here to avoid expensive `synchronized (this)`.
   How about code below:
   
   ~~~java
   AtomicBoolean needDump = new AtomicBoolean(false);
   jobIdLastDumpCheckpointIdMap.compute(
           jobId,
           (k, v) -> {
               long lastCheckpointId = v == null ? -1L: v;
               needDump.set(lastCheckpointId < checkpointId);
               return Math.max(lastCheckpointId, checkpointId);
           });
   if (needDump.get()) {
       ......
   }
   ~~~
   



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java:
##########
@@ -101,6 +101,14 @@ void notifyCheckpointSubsumed(
     /** Waits for all the pending checkpoints to finish their asynchronous 
step. */
     void waitForPendingCheckpoints() throws Exception;
 
+    /**
+     * Check whether a checkpoint is registered (running) in this coordinator.
+     *
+     * @param checkpointId of the target checkpoint
+     * @return true if the checkpoint is running.
+     */
+    boolean checkCheckpointRegistered(long checkpointId);

Review Comment:
   I think `check-check` looks a bit weird, how about `isCheckpointRegistered`?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -1316,6 +1334,24 @@ public void abortCheckpointOnBarrier(long checkpointId, 
CheckpointException caus
         subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, 
cause, operatorChain);
     }
 
+    @Override
+    public Future<Void> threadDumpOnCheckpointTimeout(long checkpointId) {
+        if (checkpointExpiredThreadDumper != null) {
+            return notifyCheckpointOperation(
+                    () -> {
+                        if (isRunning()
+                                && 
subtaskCheckpointCoordinator.checkCheckpointRegistered(
+                                        checkpointId)) {
+                            
checkpointExpiredThreadDumper.threadDumpIfNeeded(checkpointId);
+                        }
+                    },
+                    "Request thread dump when checkpoint timeout");
+        }
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        result.complete(null);
+        return result;

Review Comment:
   `return FutureUtils.completedVoidFuture()` is better here to avoid creating 
unnecessary objects.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java:
##########
@@ -431,4 +432,9 @@ public void setCheckpointResponder(CheckpointResponder 
checkpointResponder) {
     public ChannelStateWriteRequestExecutorFactory 
getChannelStateExecutorFactory() {
         return channelStateExecutorFactory;
     }
+
+    @Override
+    public CheckpointExpiredThreadDumper getCheckpointExpiredThreadDumper() {
+        return null;

Review Comment:
   I prefer to use `Optional` here due to Flink's 
[code-style](https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-optional)



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -314,4 +317,28 @@ public class ExecutionCheckpointingOptions {
                             "Defines the maximum number of subtasks that share 
the same channel state file. "
                                     + "It can reduce the number of small files 
when enable unaligned checkpoint. "
                                     + "Each subtask will create a new channel 
state file when this is configured to 1.");
+
+    public static final ConfigOption<ThreadDumpLogLevel>
+            THREAD_DUMP_LOG_LEVEL_WHEN_CHECKPOINT_TIMEOUT =
+                    
key("execution.checkpointing.thread-dump-log-level-when-checkpoint-timeout")

Review Comment:
   I think printing the thread dump info is not only useful when checkpoint 
expires, we can also enable it if job is in high back-pressured. Thus, I 
suggest to split this parameter into two parameters. One is for 
`thread-dump-log-level`, and other is for the scenarios. Such as we can print 
the thread dump info when checkpoint expires, task is running so busy, task is 
failing or the process will exit for fatal error. 
   We can only support this feature for expired checkpoint in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to