xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1057133875
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java:
##########
@@ -652,4 +654,12 @@ public void setJobStatusHooks(List<JobStatusHook> hooks) {
public List<JobStatusHook> getJobStatusHooks() {
return this.jobStatusHooks;
}
+
+ public long getClientHeartbeatTimeout() {
+ return clientHeartbeatTimeout;
+ }
+
+ public void setClientHeartbeatTimeout(long clientHeartbeatTimeout) {
+ this.clientHeartbeatTimeout = clientHeartbeatTimeout;
+ }
Review Comment:
1. I'd suggest to name this `get/setInitialClientHeartbeatTimeout`.
2. This might be more suitable to be put into `jobConfiguration`, rather
than being a direct field of `JobGraph`.
##########
flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java:
##########
@@ -42,4 +47,35 @@ public class ClientOptions {
.withDescription(
"The interval (in ms) between consecutive retries
of failed attempts to execute "
+ "commands through the CLI or Flink's
clients, wherever retry is supported (default 2sec).");
+
+ /** Timeout for job client to report its heartbeat. */
+ public static final ConfigOption<Long> CLIENT_HEARTBEAT_TIMEOUT =
+ key("client.heartbeat.timeout")
+ .longType()
+ .defaultValue(18000L)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Cancel the job if the dispatcher
hasn't received the client's"
+ + " heartbeat after
timeout when '%s' and '%s' are both true.",
+
TextElement.text(DeploymentOptions.ATTACHED.key()),
+ TextElement.text(
+
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+ .build());
+
+ /** Time interval for job client to report its heartbeat. */
+ public static final ConfigOption<Long> CLIENT_HEARTBEAT_INTERVAL =
+ key("client.heartbeat.interval")
+ .longType()
+ .defaultValue(30000L)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Time interval for job client to
report its heartbeat "
+ + "when '%s' and '%s' are
both true. Cancel the job if timeout configured by '%s'.",
+
TextElement.text(DeploymentOptions.ATTACHED.key()),
+ TextElement.text(
+
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()),
+
TextElement.text(CLIENT_HEARTBEAT_TIMEOUT.key()))
+ .build());
Review Comment:
The default values seem not right. We have a larger interval than the
timeout.
##########
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java:
##########
@@ -205,6 +205,24 @@ public class ClusterOptions {
TextElement.code(PROCESS_WORKING_DIR_BASE.key()))
.build());
+ public static final ConfigOption<Long> CLIENT_ALIVENESS_CHECK_INTERVAL =
+ ConfigOptions.key("cluster.client-aliveness-check.interval")
+ .longType()
+ .defaultValue(60000L)
+ .withDescription(
+ Description.builder()
+ .text(
+ "The interval to check whether the
client's "
+ + "heartbeat is timeout
when '%s' and '%s' are "
+ + "both true. Cancel the
job if timeout. The "
+ + "client's heartbeat
interval and timeout are "
+ + "set by
'client.heartbeat.interval' and "
+ +
"'client.heartbeat.timeout'.",
+
TextElement.text(DeploymentOptions.ATTACHED.key()),
+ TextElement.text(
+
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+ .build());
+
Review Comment:
I'm not sure if we need a dedicated config option for the checking interval.
The checking interval don't need to be very accurate and therefore we probably
can just use the configured value of the client heartbeat interval on the
cluster side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]