This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new be240dea803 [FLINK-29640] Enhance the function configured by
execution.shutdown-on-attached-exit by heartbeat between client and dispatcher
be240dea803 is described below
commit be240dea803fc82299c4711ebe656c1c4b159ca9
Author: liujiangang <[email protected]>
AuthorDate: Fri Nov 18 17:28:52 2022 +0800
[FLINK-29640] Enhance the function configured by
execution.shutdown-on-attached-exit by heartbeat between client and dispatcher
This closes #21347
---
.../shortcodes/generated/client_configuration.html | 12 ++
.../java/org/apache/flink/client/ClientUtils.java | 41 +++++++
.../org/apache/flink/client/cli/ClientOptions.java | 36 ++++++
.../deployment/ClusterClientJobClientAdapter.java | 7 ++
.../executors/PipelineExecutorUtils.java | 8 ++
.../apache/flink/client/program/ClusterClient.java | 11 ++
.../flink/client/program/ContextEnvironment.java | 12 ++
.../flink/client/program/MiniClusterClient.java | 5 +
.../client/program/StreamContextEnvironment.java | 12 ++
.../client/program/rest/RestClusterClient.java | 15 +++
.../apache/flink/client/ClientHeartbeatTest.java | 129 +++++++++++++++++++++
.../org/apache/flink/core/execution/JobClient.java | 3 +
.../src/test/resources/rest_api_v1.snapshot | 26 +++++
.../flink/runtime/dispatcher/Dispatcher.java | 85 ++++++++++++++
.../apache/flink/runtime/jobgraph/JobGraph.java | 10 ++
.../flink/runtime/minicluster/MiniCluster.java | 7 ++
.../runtime/minicluster/MiniClusterJobClient.java | 5 +
.../handler/job/JobClientHeartbeatHandler.java | 88 ++++++++++++++
.../rest/messages/JobClientHeartbeatHeaders.java | 75 ++++++++++++
.../messages/JobClientHeartbeatParameters.java | 45 +++++++
.../messages/JobClientHeartbeatRequestBody.java | 41 +++++++
.../flink/runtime/webmonitor/RestfulGateway.java | 7 ++
.../runtime/webmonitor/WebMonitorEndpoint.java | 13 +++
23 files changed, 693 insertions(+)
diff --git a/docs/layouts/shortcodes/generated/client_configuration.html
b/docs/layouts/shortcodes/generated/client_configuration.html
index e292d794f1c..1af59376474 100644
--- a/docs/layouts/shortcodes/generated/client_configuration.html
+++ b/docs/layouts/shortcodes/generated/client_configuration.html
@@ -8,6 +8,18 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>client.heartbeat.interval</h5></td>
+ <td style="word-wrap: break-word;">30000</td>
+ <td>Long</td>
+ <td>Time interval for job client to report its heartbeat when
'execution.attached' and 'execution.shutdown-on-attached-exit' are both true.
Cancel the job if timeout configured by 'client.heartbeat.timeout'.</td>
+ </tr>
+ <tr>
+ <td><h5>client.heartbeat.timeout</h5></td>
+ <td style="word-wrap: break-word;">180000</td>
+ <td>Long</td>
+ <td>Cancel the job if the dispatcher hasn't received the client's
heartbeat after timeout when 'execution.attached' and
'execution.shutdown-on-attached-exit' are both true.</td>
+ </tr>
<tr>
<td><h5>client.retry-period</h5></td>
<td style="word-wrap: break-word;">2 s</td>
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index bf6febda792..e6a39a5f1a0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -18,7 +18,9 @@
package org.apache.flink.client;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
@@ -27,6 +29,7 @@ import
org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.jobmaster.JobResult;
@@ -42,7 +45,11 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Utility functions for Flink client. */
@@ -144,4 +151,38 @@ public enum ClientUtils {
throw new RuntimeException("Error while waiting for job to be
initialized", throwable);
}
}
+
+ /**
+ * The client reports the heartbeat to the dispatcher for aliveness.
+ *
+ * @param jobClient The job client.
+ * @param interval The heartbeat interval.
+ * @param timeout The heartbeat timeout.
+ * @return The ScheduledExecutorService which reports heartbeat
periodically.
+ */
+ public static ScheduledExecutorService reportHeartbeatPeriodically(
+ JobClient jobClient, long interval, long timeout) {
+ checkArgument(
+ interval < timeout,
+ "The client's heartbeat interval "
+ + "should be less than the heartbeat timeout. Please
adjust the param '"
+ + ClientOptions.CLIENT_HEARTBEAT_INTERVAL
+ + "' or '"
+ + ClientOptions.CLIENT_HEARTBEAT_TIMEOUT
+ + "'");
+
+ JobID jobID = jobClient.getJobID();
+ LOG.info("Begin to report client's heartbeat for the job {}.", jobID);
+
+ ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutor.scheduleAtFixedRate(
+ () -> {
+ LOG.debug("Report client's heartbeat for the job {}.",
jobID);
+ jobClient.reportHeartbeat(System.currentTimeMillis() +
timeout);
+ },
+ interval,
+ interval,
+ TimeUnit.MILLISECONDS);
+ return scheduledExecutor;
+ }
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
index 33cda70d91d..50bfb9f8fd0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
@@ -20,9 +20,14 @@ package org.apache.flink.client.cli;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
import java.time.Duration;
+import static org.apache.flink.configuration.ConfigOptions.key;
+
/** Describes a client configuration parameter. */
@PublicEvolving
public class ClientOptions {
@@ -42,4 +47,35 @@ public class ClientOptions {
.withDescription(
"The interval (in ms) between consecutive retries
of failed attempts to execute "
+ "commands through the CLI or Flink's
clients, wherever retry is supported (default 2sec).");
+
+ /** Timeout for job client to report its heartbeat. */
+ public static final ConfigOption<Long> CLIENT_HEARTBEAT_TIMEOUT =
+ key("client.heartbeat.timeout")
+ .longType()
+ .defaultValue(180000L)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Cancel the job if the dispatcher
hasn't received the client's"
+ + " heartbeat after
timeout when '%s' and '%s' are both true.",
+
TextElement.text(DeploymentOptions.ATTACHED.key()),
+ TextElement.text(
+
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+ .build());
+
+ /** Time interval for job client to report its heartbeat. */
+ public static final ConfigOption<Long> CLIENT_HEARTBEAT_INTERVAL =
+ key("client.heartbeat.interval")
+ .longType()
+ .defaultValue(30000L)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Time interval for job client to
report its heartbeat "
+ + "when '%s' and '%s' are
both true. Cancel the job if timeout configured by '%s'.",
+
TextElement.text(DeploymentOptions.ATTACHED.key()),
+ TextElement.text(
+
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()),
+
TextElement.text(CLIENT_HEARTBEAT_TIMEOUT.key()))
+ .build());
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index e776784e551..348d71d4163 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -142,6 +142,13 @@ public class ClusterClientJobClientAdapter<ClusterID>
clusterClient -> clusterClient.sendCoordinationRequest(jobID,
operatorId, request));
}
+ @Override
+ public void reportHeartbeat(long expiredTimestamp) {
+ bridgeClientRequest(
+ clusterClientProvider,
+ (clusterClient -> clusterClient.reportHeartbeat(jobID,
expiredTimestamp)));
+ }
+
private static <T> CompletableFuture<T> bridgeClientRequest(
ClusterClientProvider<?> clusterClientProvider,
Function<ClusterClient<?>, CompletableFuture<T>> resultRetriever) {
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
index 76c5d67cc40..137dca5f2f6 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
@@ -21,8 +21,10 @@ package org.apache.flink.client.deployment.executors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -66,6 +68,12 @@ public class PipelineExecutorUtils {
.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
.ifPresent(strJobID ->
jobGraph.setJobID(JobID.fromHexString(strJobID)));
+ if (configuration.getBoolean(DeploymentOptions.ATTACHED)
+ &&
configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
+ jobGraph.setInitialClientHeartbeatTimeout(
+
configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
+ }
+
jobGraph.addJars(executionConfigAccessor.getJars());
jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 0cbc27487d5..ff9bd87569d 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -31,6 +31,7 @@ import
org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.concurrent.FutureUtils;
import javax.annotation.Nullable;
@@ -206,4 +207,14 @@ public interface ClusterClient<T> extends AutoCloseable {
default CompletableFuture<Void> invalidateClusterDataset(AbstractID
clusterDatasetId) {
return CompletableFuture.completedFuture(null);
}
+
+ /**
+ * The client reports the heartbeat to the dispatcher for aliveness.
+ *
+ * @param jobId The jobId for the client and the job.
+ * @return
+ */
+ default CompletableFuture<Void> reportHeartbeat(JobID jobId, long
expiredTimestamp) {
+ return FutureUtils.completedVoidFuture();
+ }
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index a48403606ad..4cd0d5aaf92 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
@@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -95,6 +98,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
jobClient.getJobExecutionResult();
+ ScheduledExecutorService clientHeartbeatService = null;
if
(getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
Thread shutdownHook =
ShutdownHookUtil.addShutdownHook(
@@ -112,9 +116,17 @@ public class ContextEnvironment extends
ExecutionEnvironment {
shutdownHook,
ContextEnvironment.class.getSimpleName(),
LOG));
+ clientHeartbeatService =
+ ClientUtils.reportHeartbeatPeriodically(
+ jobClient,
+
getConfiguration().getLong(ClientOptions.CLIENT_HEARTBEAT_INTERVAL),
+
getConfiguration().getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
}
jobExecutionResult = jobExecutionResultFuture.get();
+ if (clientHeartbeatService != null) {
+ clientHeartbeatService.shutdown();
+ }
if (!suppressSysout) {
System.out.println(jobExecutionResult);
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 80b949daf6f..121b5dd2e64 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -195,6 +195,11 @@ public class MiniClusterClient implements
ClusterClient<MiniClusterClient.MiniCl
return miniCluster.invalidateClusterDataset(new
IntermediateDataSetID(clusterDatasetId));
}
+ @Override
+ public CompletableFuture<Void> reportHeartbeat(JobID jobId, long
expiredTimestamp) {
+ return miniCluster.reportHeartbeat(jobId, expiredTimestamp);
+ }
+
/** The type of the Cluster ID for the local {@link MiniCluster}. */
public enum MiniClusterId {
INSTANCE
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index efe5ada5912..9e7287c2afb 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
@@ -50,6 +52,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -140,6 +143,7 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
jobClient.getJobExecutionResult();
+ ScheduledExecutorService clientHeartbeatService = null;
if
(configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
Thread shutdownHook =
ShutdownHookUtil.addShutdownHook(
@@ -157,9 +161,17 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
shutdownHook,
StreamContextEnvironment.class.getSimpleName(),
LOG));
+ clientHeartbeatService =
+ ClientUtils.reportHeartbeatPeriodically(
+ jobClient,
+
configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_INTERVAL),
+
configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
}
jobExecutionResult = jobExecutionResultFuture.get();
+ if (clientHeartbeatService != null) {
+ clientHeartbeatService.shutdown();
+ }
if (!suppressSysout) {
System.out.println(jobExecutionResult);
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 6ec95eba137..a05350d5e58 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -58,6 +58,9 @@ import
org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatHeaders;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -727,6 +730,18 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
});
}
+ @Override
+ public CompletableFuture<Void> reportHeartbeat(JobID jobId, long
expiredTimestamp) {
+ JobClientHeartbeatParameters params =
+ new JobClientHeartbeatParameters().resolveJobId(jobId);
+ CompletableFuture<EmptyResponseBody> responseFuture =
+ sendRequest(
+ JobClientHeartbeatHeaders.getInstance(),
+ params,
+ new JobClientHeartbeatRequestBody(expiredTimestamp));
+ return responseFuture.thenApply(ignore -> null);
+ }
+
@Override
public void shutDownCluster() {
try {
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/ClientHeartbeatTest.java
b/flink-clients/src/test/java/org/apache/flink/client/ClientHeartbeatTest.java
new file mode 100644
index 00000000000..25bb9fbae9b
--- /dev/null
+++
b/flink-clients/src/test/java/org/apache/flink/client/ClientHeartbeatTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.PerJobMiniClusterFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.WaitingCancelableInvokable;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for client's heartbeat. */
+public class ClientHeartbeatTest {
+ private final long clientHeartbeatInterval = 50;
+ private final long clientHeartbeatTimeout = 500;
+
+ private MiniCluster miniCluster;
+
+ @AfterEach
+ void teardown() throws Exception {
+ if (miniCluster != null) {
+ miniCluster.close();
+ }
+ }
+
+ @Test
+ void testJobCancelledIfClientHeartbeatTimeout() throws Exception {
+ JobClient jobClient = submitJob(createConfiguration(true));
+
+ // The client doesn't report heartbeat to the dispatcher.
+
+ assertThat(jobClient.getJobExecutionResult())
+ .failsWithin(Duration.ofSeconds(1))
+ .withThrowableOfType(ExecutionException.class)
+ .withMessageContaining("Job was cancelled");
+
+ assertThat(miniCluster.isRunning()).isFalse();
+ }
+
+ @Test
+ void testJobRunningIfClientReportHeartbeat() throws Exception {
+ JobClient jobClient = submitJob(createConfiguration(true));
+
+ // The client reports heartbeat to the dispatcher.
+ ScheduledExecutorService heartbeatService =
+ ClientUtils.reportHeartbeatPeriodically(
+ jobClient, clientHeartbeatInterval,
clientHeartbeatTimeout);
+
+ Thread.sleep(2 * clientHeartbeatTimeout);
+
assertThat(jobClient.getJobStatus().get()).isEqualTo(JobStatus.RUNNING);
+
+ heartbeatService.shutdown();
+ }
+
+ @Test
+ void testJobRunningIfDisableClientHeartbeat() throws Exception {
+ JobClient jobClient = submitJob(createConfiguration(false));
+
+ Thread.sleep(2 * clientHeartbeatTimeout);
+
assertThat(jobClient.getJobStatus().get()).isEqualTo(JobStatus.RUNNING);
+ }
+
+ private Configuration createConfiguration(boolean shutdownOnAttachedExit) {
+ Configuration configuration = new Configuration();
+ if (shutdownOnAttachedExit) {
+ configuration.setBoolean(DeploymentOptions.ATTACHED, true);
+ configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED,
true);
+ }
+ return configuration;
+ }
+
+ // The dispatcher deals with client heartbeat only when
shutdownOnAttachedExit is true;
+ private JobClient submitJob(Configuration configuration) throws Exception {
+ PerJobMiniClusterFactory perJobMiniClusterFactory =
+ PerJobMiniClusterFactory.createWithFactory(
+ configuration,
+ config -> {
+ miniCluster = new MiniCluster(config);
+ return miniCluster;
+ });
+
+ JobGraph cancellableJobGraph = getCancellableJobGraph();
+ // Enable heartbeat only when both execution.attached and
+ // execution.shutdown-on-attached-exit are true.
+ if (configuration.getBoolean(DeploymentOptions.ATTACHED)
+ &&
configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
+
cancellableJobGraph.setInitialClientHeartbeatTimeout(clientHeartbeatTimeout);
+ }
+ return perJobMiniClusterFactory
+ .submitJob(cancellableJobGraph,
ClassLoader.getSystemClassLoader())
+ .get();
+ }
+
+ private static JobGraph getCancellableJobGraph() {
+ JobVertex jobVertex = new JobVertex("jobVertex");
+ jobVertex.setInvokableClass(WaitingCancelableInvokable.class);
+ jobVertex.setParallelism(1);
+ return JobGraphTestUtils.streamingJobGraph(jobVertex);
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 0febcbc1cab..a0411937395 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -114,4 +114,7 @@ public interface JobClient {
/** Returns the {@link JobExecutionResult result of the job execution} of
the submitted job. */
CompletableFuture<JobExecutionResult> getJobExecutionResult();
+
+ /** The client reports the heartbeat to the dispatcher for aliveness. */
+ default void reportHeartbeat(long expiredTimestamp) {}
}
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index de462b9f204..d4b2937e1c6 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1838,6 +1838,32 @@
}
}
}
+ }, {
+ "url" : "/jobs/:jobid/clientHeartbeat",
+ "method" : "PATCH",
+ "status-code" : "202 Accepted",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "jobid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobClientHeartbeatRequestBody",
+ "properties" : {
+ "expiredTimestamp" : {
+ "type" : "integer"
+ }
+ }
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
+ }
}, {
"url" : "/jobs/:jobid/config",
"method" : "GET",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index cfa36bad1df..83c493a2ad4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.metrics.MetricGroup;
@@ -101,6 +102,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -108,6 +110,8 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -125,6 +129,8 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
private static final int INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY = 16;
+ private static final long MAX_JOB_CLIENT_ALIVENESS_CHECK_INTERVAL = 60_000;
+
private final Configuration configuration;
private final JobGraphWriter jobGraphWriter;
@@ -170,6 +176,10 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
private final ResourceCleaner localResourceCleaner;
private final ResourceCleaner globalResourceCleaner;
+ private final Time webTimeout;
+ private final Map<JobID, Long> jobClientExpiredTimestamp = new HashMap<>();
+ private ScheduledFuture<?> jobClientAlivenessCheck;
+
/** Enum to distinguish between initial job submission and re-submission
for recovery. */
protected enum ExecutionType {
SUBMISSION,
@@ -282,6 +292,8 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
resourceCleanerFactory.createLocalResourceCleaner(this.getMainThreadExecutor());
this.globalResourceCleaner =
resourceCleanerFactory.createGlobalResourceCleaner(this.getMainThreadExecutor());
+
+ this.webTimeout =
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
}
// ------------------------------------------------------
@@ -354,6 +366,9 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
private void runRecoveredJob(final JobGraph recoveredJob) {
checkNotNull(recoveredJob);
+
+ initJobClientExpiredTime(recoveredJob);
+
try {
runJob(createJobMasterRunner(recoveredJob),
ExecutionType.RECOVERY);
} catch (Throwable throwable) {
@@ -365,6 +380,33 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
}
}
+ private void initJobClientExpiredTime(JobGraph jobGraph) {
+ JobID jobID = jobGraph.getJobID();
+ long initialClientHeartbeatTimeout =
jobGraph.getInitialClientHeartbeatTimeout();
+ if (initialClientHeartbeatTimeout > 0) {
+ log.info(
+ "Begin to detect the client's aliveness for job {}. The
heartbeat timeout is {}",
+ jobID,
+ initialClientHeartbeatTimeout);
+ jobClientExpiredTimestamp.put(
+ jobID, System.currentTimeMillis() +
initialClientHeartbeatTimeout);
+
+ if (jobClientAlivenessCheck == null) {
+ // Use the client heartbeat timeout as the check interval.
+ jobClientAlivenessCheck =
+ this.getRpcService()
+ .getScheduledExecutor()
+ .scheduleWithFixedDelay(
+ () ->
+ getMainThreadExecutor()
+
.execute(this::checkJobClientAliveness),
+ 0L,
+ initialClientHeartbeatTimeout,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
private void startCleanupRetries() {
recoveredDirtyJobs.forEach(this::runCleanupRetry);
recoveredDirtyJobs.clear();
@@ -399,6 +441,11 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
public CompletableFuture<Void> onStop() {
log.info("Stopping dispatcher {}.", getAddress());
+ if (jobClientAlivenessCheck != null) {
+ jobClientAlivenessCheck.cancel(false);
+ jobClientAlivenessCheck = null;
+ }
+
final CompletableFuture<Void> allJobsTerminationFuture =
terminateRunningJobsAndGetTerminationFuture();
@@ -570,6 +617,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
private void persistAndRunJob(JobGraph jobGraph) throws Exception {
jobGraphWriter.putJobGraph(jobGraph);
+ initJobClientExpiredTime(jobGraph);
runJob(createJobMasterRunner(jobGraph), ExecutionType.SUBMISSION);
}
@@ -1016,6 +1064,43 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
operatorId, serializedRequest, timeout));
}
+ @Override
+ public CompletableFuture<Void> reportJobClientHeartbeat(
+ JobID jobId, long expiredTimestamp, Time timeout) {
+ if (!getJobManagerRunner(jobId).isPresent()) {
+ log.warn("Fail to find job {} for client.", jobId);
+ } else {
+ log.debug(
+ "Job {} receives client's heartbeat which expiredTimestamp
is {}.",
+ jobId,
+ expiredTimestamp);
+ jobClientExpiredTimestamp.put(jobId, expiredTimestamp);
+ }
+ return FutureUtils.completedVoidFuture();
+ }
+
+ private void checkJobClientAliveness() {
+ long currentTimestamp = System.currentTimeMillis();
+ Iterator<Map.Entry<JobID, Long>> iterator =
jobClientExpiredTimestamp.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<JobID, Long> entry = iterator.next();
+ JobID jobID = entry.getKey();
+ long expiredTimestamp = entry.getValue();
+
+ if (!getJobManagerRunner(jobID).isPresent()) {
+ iterator.remove();
+ } else if (expiredTimestamp <= currentTimestamp) {
+ log.warn(
+ "The heartbeat from the job client is timeout and
cancel the job {}. "
+ + "You can adjust the heartbeat interval "
+ + "by 'client.heartbeat.interval' and the
timeout "
+ + "by 'client.heartbeat.timeout'",
+ jobID);
+ cancelJob(jobID, webTimeout);
+ }
+ }
+ }
+
private void registerJobManagerRunnerTerminationFuture(
JobID jobId, CompletableFuture<Void>
jobManagerRunnerTerminationFuture) {
Preconditions.checkState(!jobManagerRunnerTerminationFutures.containsKey(jobId));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index bb821adda7b..7a739a40289 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -70,6 +70,8 @@ public class JobGraph implements Serializable {
private static final long serialVersionUID = 1L;
+ private static final String INITIAL_CLIENT_HEARTBEAT_TIMEOUT =
"initialClientHeartbeatTimeout";
+
// --- job and configuration ---
/** List of task vertices included in this job graph. */
@@ -652,4 +654,12 @@ public class JobGraph implements Serializable {
public List<JobStatusHook> getJobStatusHooks() {
return this.jobStatusHooks;
}
+
+ public void setInitialClientHeartbeatTimeout(long
initialClientHeartbeatTimeout) {
+ jobConfiguration.setLong(INITIAL_CLIENT_HEARTBEAT_TIMEOUT,
initialClientHeartbeatTimeout);
+ }
+
+ public long getInitialClientHeartbeatTimeout() {
+ return jobConfiguration.getLong(INITIAL_CLIENT_HEARTBEAT_TIMEOUT,
Long.MIN_VALUE);
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 98614e3ecc9..59240a14c03 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -1341,6 +1341,13 @@ public class MiniCluster implements AutoCloseableAsync {
metaInfoMap -> new
HashSet<>(metaInfoMap.keySet())));
}
+ public CompletableFuture<Void> reportHeartbeat(JobID jobId, long
expiredTimestamp) {
+ return runDispatcherCommand(
+ dispatcherGateway ->
+ dispatcherGateway.reportJobClientHeartbeat(
+ jobId, expiredTimestamp, rpcTimeout));
+ }
+
/** Internal factory for {@link RpcService}. */
protected interface RpcServiceFactory {
RpcService createRpcService() throws Exception;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
index 6310b62b34c..cee75957f1e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
@@ -158,6 +158,11 @@ public final class MiniClusterJobClient implements
JobClient, CoordinationReques
}
}
+ @Override
+ public void reportHeartbeat(long expiredTimestamp) {
+ miniCluster.reportHeartbeat(jobID, expiredTimestamp);
+ }
+
private static void shutDownCluster(MiniCluster miniCluster) {
miniCluster
.closeAsync()
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java
new file mode 100644
index 00000000000..2487b4bdf2c
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** Receive the heartbeat from the client. */
+public class JobClientHeartbeatHandler
+ extends AbstractRestHandler<
+ RestfulGateway,
+ JobClientHeartbeatRequestBody,
+ EmptyResponseBody,
+ JobClientHeartbeatParameters> {
+ private static final Logger LOG =
LoggerFactory.getLogger(JobClientHeartbeatHandler.class);
+
+ public JobClientHeartbeatHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> headers,
+ MessageHeaders<
+ JobClientHeartbeatRequestBody,
+ EmptyResponseBody,
+ JobClientHeartbeatParameters>
+ messageHeaders) {
+ super(leaderRetriever, timeout, headers, messageHeaders);
+ }
+
+ @Override
+ public CompletableFuture<EmptyResponseBody> handleRequest(
+ HandlerRequest<JobClientHeartbeatRequestBody> request,
RestfulGateway gateway)
+ throws RestHandlerException {
+ return gateway.reportJobClientHeartbeat(
+ request.getPathParameter(JobIDPathParameter.class),
+ request.getRequestBody().getExpiredTimestamp(),
+ timeout)
+ .handle(
+ (Void ack, Throwable error) -> {
+ if (error != null) {
+ String errorMessage =
+ "Fail to report jobClient's heartbeat:
"
+ + error.getMessage();
+ LOG.error(errorMessage, error);
+ throw new CompletionException(
+ new RestHandlerException(
+ errorMessage,
+
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ error));
+ } else {
+ return EmptyResponseBody.getInstance();
+ }
+ });
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatHeaders.java
new file mode 100644
index 00000000000..0bb3e7f3d94
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatHeaders.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Message headers for the {@link JobClientHeartbeatHandler}. */
+public class JobClientHeartbeatHeaders
+ implements RuntimeMessageHeaders<
+ JobClientHeartbeatRequestBody, EmptyResponseBody,
JobClientHeartbeatParameters> {
+
+ public static final String URL = "/jobs/:jobid/clientHeartbeat";
+
+ private static final JobClientHeartbeatHeaders INSTANCE = new
JobClientHeartbeatHeaders();
+
+ private JobClientHeartbeatHeaders() {}
+
+ @Override
+ public Class<JobClientHeartbeatRequestBody> getRequestClass() {
+ return JobClientHeartbeatRequestBody.class;
+ }
+
+ @Override
+ public Class<EmptyResponseBody> getResponseClass() {
+ return EmptyResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.ACCEPTED;
+ }
+
+ @Override
+ public JobClientHeartbeatParameters getUnresolvedMessageParameters() {
+ return new JobClientHeartbeatParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.PATCH;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static JobClientHeartbeatHeaders getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Report the jobClient's aliveness.";
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatParameters.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatParameters.java
new file mode 100644
index 00000000000..96aaa9ef492
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatParameters.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/** The parameter when the client reports heartbeat. */
+public class JobClientHeartbeatParameters extends MessageParameters {
+
+ private final JobIDPathParameter jobPathParameter = new
JobIDPathParameter();
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Collections.singleton(jobPathParameter);
+ }
+
+ @Override
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
+ return Collections.emptyList();
+ }
+
+ public JobClientHeartbeatParameters resolveJobId(JobID jobId) {
+ jobPathParameter.resolve(jobId);
+ return this;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatRequestBody.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatRequestBody.java
new file mode 100644
index 00000000000..d38cd5405c7
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatRequestBody.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** {@link RequestBody} to report heartbeat for client. */
+public class JobClientHeartbeatRequestBody implements RequestBody {
+ private static final String EXPIRED_TIMESTAMP = "expiredTimestamp";
+
+ @JsonProperty(EXPIRED_TIMESTAMP)
+ private final long expiredTimestamp;
+
+ @JsonCreator
+ public JobClientHeartbeatRequestBody(long expiredTimestamp) {
+ this.expiredTimestamp = expiredTimestamp;
+ }
+
+ @JsonIgnore
+ public long getExpiredTimestamp() {
+ return expiredTimestamp;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 4f9085cda2e..4a7cf5dddcb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
@@ -271,4 +272,10 @@ public interface RestfulGateway extends RpcGateway {
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
+
+ /** The client reports the heartbeat to the dispatcher for aliveness. */
+ default CompletableFuture<Void> reportJobClientHeartbeat(
+ JobID jobId, long expiredTimestamp, Time timeout) {
+ return FutureUtils.completedVoidFuture();
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 1ab048995aa..7836d720237 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -46,6 +46,7 @@ import
org.apache.flink.runtime.rest.handler.dataset.ClusterDataSetListHandler;
import org.apache.flink.runtime.rest.handler.job.GeneratedLogUrlHandler;
import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobCancellationHandler;
+import org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
@@ -101,6 +102,7 @@ import
org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatHeaders;
import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import
org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
@@ -692,6 +694,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
new ShutdownHandler(
leaderRetriever, timeout, responseHeaders,
ShutdownHeaders.getInstance());
+ final JobClientHeartbeatHandler jobClientHeartbeatHandler =
+ new JobClientHeartbeatHandler(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobClientHeartbeatHeaders.getInstance());
+
final File webUiDir = restConfiguration.getWebUiDir();
Optional<StaticFileServerHandler<T>> optWebContent;
@@ -878,6 +887,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
handlers.add(Tuple2.of(shutdownHandler.getMessageHeaders(),
shutdownHandler));
+ handlers.add(
+ Tuple2.of(
+ jobClientHeartbeatHandler.getMessageHeaders(),
jobClientHeartbeatHandler));
+
optWebContent.ifPresent(
webContent -> {
handlers.add(