xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1057133875


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java:
##########
@@ -652,4 +654,12 @@ public void setJobStatusHooks(List<JobStatusHook> hooks) {
     public List<JobStatusHook> getJobStatusHooks() {
         return this.jobStatusHooks;
     }
+
+    public long getClientHeartbeatTimeout() {
+        return clientHeartbeatTimeout;
+    }
+
+    public void setClientHeartbeatTimeout(long clientHeartbeatTimeout) {
+        this.clientHeartbeatTimeout = clientHeartbeatTimeout;
+    }

Review Comment:
   1. I'd suggest to name this `get/setInitialClientHeartbeatTimeout`.
   2. This might be more suitable to be put into `jobConfiguration`, rather 
than being a direct field of `JobGraph`.



##########
flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java:
##########
@@ -42,4 +47,35 @@ public class ClientOptions {
                     .withDescription(
                             "The interval (in ms) between consecutive retries 
of failed attempts to execute "
                                     + "commands through the CLI or Flink's 
clients, wherever retry is supported (default 2sec).");
+
+    /** Timeout for job client to report its heartbeat. */
+    public static final ConfigOption<Long> CLIENT_HEARTBEAT_TIMEOUT =
+            key("client.heartbeat.timeout")
+                    .longType()
+                    .defaultValue(18000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Cancel the job if the dispatcher 
hasn't received the client's"
+                                                    + " heartbeat after 
timeout when '%s' and '%s' are both true.",
+                                            
TextElement.text(DeploymentOptions.ATTACHED.key()),
+                                            TextElement.text(
+                                                    
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+                                    .build());
+
+    /** Time interval for job client to report its heartbeat. */
+    public static final ConfigOption<Long> CLIENT_HEARTBEAT_INTERVAL =
+            key("client.heartbeat.interval")
+                    .longType()
+                    .defaultValue(30000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Time interval for job client to 
report its heartbeat "
+                                                    + "when '%s' and '%s' are 
both true. Cancel the job if timeout configured by '%s'.",
+                                            
TextElement.text(DeploymentOptions.ATTACHED.key()),
+                                            TextElement.text(
+                                                    
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()),
+                                            
TextElement.text(CLIENT_HEARTBEAT_TIMEOUT.key()))
+                                    .build());

Review Comment:
   The default values seem not right. We have a larger interval than the 
timeout.



##########
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java:
##########
@@ -205,6 +205,24 @@ public class ClusterOptions {
                                             
TextElement.code(PROCESS_WORKING_DIR_BASE.key()))
                                     .build());
 
+    public static final ConfigOption<Long> CLIENT_ALIVENESS_CHECK_INTERVAL =
+            ConfigOptions.key("cluster.client-aliveness-check.interval")
+                    .longType()
+                    .defaultValue(60000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The interval to check whether the 
client's "
+                                                    + "heartbeat is timeout 
when '%s' and '%s' are "
+                                                    + "both true. Cancel the 
job if timeout. The "
+                                                    + "client's heartbeat 
interval and timeout are "
+                                                    + "set by 
'client.heartbeat.interval' and "
+                                                    + 
"'client.heartbeat.timeout'.",
+                                            
TextElement.text(DeploymentOptions.ATTACHED.key()),
+                                            TextElement.text(
+                                                    
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+                                    .build());
+

Review Comment:
   I'm not sure if we need a dedicated config option for the checking interval. 
The checking interval don't need to be very accurate and therefore we probably 
can just use the configured value of the client heartbeat interval on the 
cluster side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to