This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new be240dea803 [FLINK-29640] Enhance the function configured by 
execution.shutdown-on-attached-exit by heartbeat between client and dispatcher
be240dea803 is described below

commit be240dea803fc82299c4711ebe656c1c4b159ca9
Author: liujiangang <[email protected]>
AuthorDate: Fri Nov 18 17:28:52 2022 +0800

    [FLINK-29640] Enhance the function configured by 
execution.shutdown-on-attached-exit by heartbeat between client and dispatcher
    
    This closes #21347
---
 .../shortcodes/generated/client_configuration.html |  12 ++
 .../java/org/apache/flink/client/ClientUtils.java  |  41 +++++++
 .../org/apache/flink/client/cli/ClientOptions.java |  36 ++++++
 .../deployment/ClusterClientJobClientAdapter.java  |   7 ++
 .../executors/PipelineExecutorUtils.java           |   8 ++
 .../apache/flink/client/program/ClusterClient.java |  11 ++
 .../flink/client/program/ContextEnvironment.java   |  12 ++
 .../flink/client/program/MiniClusterClient.java    |   5 +
 .../client/program/StreamContextEnvironment.java   |  12 ++
 .../client/program/rest/RestClusterClient.java     |  15 +++
 .../apache/flink/client/ClientHeartbeatTest.java   | 129 +++++++++++++++++++++
 .../org/apache/flink/core/execution/JobClient.java |   3 +
 .../src/test/resources/rest_api_v1.snapshot        |  26 +++++
 .../flink/runtime/dispatcher/Dispatcher.java       |  85 ++++++++++++++
 .../apache/flink/runtime/jobgraph/JobGraph.java    |  10 ++
 .../flink/runtime/minicluster/MiniCluster.java     |   7 ++
 .../runtime/minicluster/MiniClusterJobClient.java  |   5 +
 .../handler/job/JobClientHeartbeatHandler.java     |  88 ++++++++++++++
 .../rest/messages/JobClientHeartbeatHeaders.java   |  75 ++++++++++++
 .../messages/JobClientHeartbeatParameters.java     |  45 +++++++
 .../messages/JobClientHeartbeatRequestBody.java    |  41 +++++++
 .../flink/runtime/webmonitor/RestfulGateway.java   |   7 ++
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  13 +++
 23 files changed, 693 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/client_configuration.html 
b/docs/layouts/shortcodes/generated/client_configuration.html
index e292d794f1c..1af59376474 100644
--- a/docs/layouts/shortcodes/generated/client_configuration.html
+++ b/docs/layouts/shortcodes/generated/client_configuration.html
@@ -8,6 +8,18 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>client.heartbeat.interval</h5></td>
+            <td style="word-wrap: break-word;">30000</td>
+            <td>Long</td>
+            <td>Time interval for job client to report its heartbeat when 
'execution.attached' and 'execution.shutdown-on-attached-exit' are both true. 
Cancel the job if timeout configured by 'client.heartbeat.timeout'.</td>
+        </tr>
+        <tr>
+            <td><h5>client.heartbeat.timeout</h5></td>
+            <td style="word-wrap: break-word;">180000</td>
+            <td>Long</td>
+            <td>Cancel the job if the dispatcher hasn't received the client's 
heartbeat after timeout when 'execution.attached' and 
'execution.shutdown-on-attached-exit' are both true.</td>
+        </tr>
         <tr>
             <td><h5>client.retry-period</h5></td>
             <td style="word-wrap: break-word;">2 s</td>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index bf6febda792..e6a39a5f1a0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.client;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -27,6 +29,7 @@ import 
org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobInitializationException;
 import org.apache.flink.runtime.jobmaster.JobResult;
@@ -42,7 +45,11 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Utility functions for Flink client. */
@@ -144,4 +151,38 @@ public enum ClientUtils {
             throw new RuntimeException("Error while waiting for job to be 
initialized", throwable);
         }
     }
+
+    /**
+     * The client reports the heartbeat to the dispatcher for aliveness.
+     *
+     * @param jobClient The job client.
+     * @param interval The heartbeat interval.
+     * @param timeout The heartbeat timeout.
+     * @return The ScheduledExecutorService which reports heartbeat 
periodically.
+     */
+    public static ScheduledExecutorService reportHeartbeatPeriodically(
+            JobClient jobClient, long interval, long timeout) {
+        checkArgument(
+                interval < timeout,
+                "The client's heartbeat interval "
+                        + "should be less than the heartbeat timeout. Please 
adjust the param '"
+                        + ClientOptions.CLIENT_HEARTBEAT_INTERVAL
+                        + "' or '"
+                        + ClientOptions.CLIENT_HEARTBEAT_TIMEOUT
+                        + "'");
+
+        JobID jobID = jobClient.getJobID();
+        LOG.info("Begin to report client's heartbeat for the job {}.", jobID);
+
+        ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor();
+        scheduledExecutor.scheduleAtFixedRate(
+                () -> {
+                    LOG.debug("Report client's heartbeat for the job {}.", 
jobID);
+                    jobClient.reportHeartbeat(System.currentTimeMillis() + 
timeout);
+                },
+                interval,
+                interval,
+                TimeUnit.MILLISECONDS);
+        return scheduledExecutor;
+    }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
index 33cda70d91d..50bfb9f8fd0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
@@ -20,9 +20,14 @@ package org.apache.flink.client.cli;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
 
 import java.time.Duration;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 /** Describes a client configuration parameter. */
 @PublicEvolving
 public class ClientOptions {
@@ -42,4 +47,35 @@ public class ClientOptions {
                     .withDescription(
                             "The interval (in ms) between consecutive retries 
of failed attempts to execute "
                                     + "commands through the CLI or Flink's 
clients, wherever retry is supported (default 2sec).");
+
+    /** Timeout for job client to report its heartbeat. */
+    public static final ConfigOption<Long> CLIENT_HEARTBEAT_TIMEOUT =
+            key("client.heartbeat.timeout")
+                    .longType()
+                    .defaultValue(180000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Cancel the job if the dispatcher 
hasn't received the client's"
+                                                    + " heartbeat after 
timeout when '%s' and '%s' are both true.",
+                                            
TextElement.text(DeploymentOptions.ATTACHED.key()),
+                                            TextElement.text(
+                                                    
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+                                    .build());
+
+    /** Time interval for job client to report its heartbeat. */
+    public static final ConfigOption<Long> CLIENT_HEARTBEAT_INTERVAL =
+            key("client.heartbeat.interval")
+                    .longType()
+                    .defaultValue(30000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Time interval for job client to 
report its heartbeat "
+                                                    + "when '%s' and '%s' are 
both true. Cancel the job if timeout configured by '%s'.",
+                                            
TextElement.text(DeploymentOptions.ATTACHED.key()),
+                                            TextElement.text(
+                                                    
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()),
+                                            
TextElement.text(CLIENT_HEARTBEAT_TIMEOUT.key()))
+                                    .build());
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index e776784e551..348d71d4163 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -142,6 +142,13 @@ public class ClusterClientJobClientAdapter<ClusterID>
                 clusterClient -> clusterClient.sendCoordinationRequest(jobID, 
operatorId, request));
     }
 
+    @Override
+    public void reportHeartbeat(long expiredTimestamp) {
+        bridgeClientRequest(
+                clusterClientProvider,
+                (clusterClient -> clusterClient.reportHeartbeat(jobID, 
expiredTimestamp)));
+    }
+
     private static <T> CompletableFuture<T> bridgeClientRequest(
             ClusterClientProvider<?> clusterClientProvider,
             Function<ClusterClient<?>, CompletableFuture<T>> resultRetriever) {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
index 76c5d67cc40..137dca5f2f6 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
@@ -21,8 +21,10 @@ package org.apache.flink.client.deployment.executors;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
@@ -66,6 +68,12 @@ public class PipelineExecutorUtils {
                 .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
                 .ifPresent(strJobID -> 
jobGraph.setJobID(JobID.fromHexString(strJobID)));
 
+        if (configuration.getBoolean(DeploymentOptions.ATTACHED)
+                && 
configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
+            jobGraph.setInitialClientHeartbeatTimeout(
+                    
configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
+        }
+
         jobGraph.addJars(executionConfigAccessor.getJars());
         jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
         
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 0cbc27487d5..ff9bd87569d 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import javax.annotation.Nullable;
 
@@ -206,4 +207,14 @@ public interface ClusterClient<T> extends AutoCloseable {
     default CompletableFuture<Void> invalidateClusterDataset(AbstractID 
clusterDatasetId) {
         return CompletableFuture.completedFuture(null);
     }
+
+    /**
+     * The client reports the heartbeat to the dispatcher for aliveness.
+     *
+     * @param jobId The jobId for the client and the job.
+     * @return
+     */
+    default CompletableFuture<Void> reportHeartbeat(JobID jobId, long 
expiredTimestamp) {
+        return FutureUtils.completedVoidFuture();
+    }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index a48403606ad..4cd0d5aaf92 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
@@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -95,6 +98,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
             CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
                     jobClient.getJobExecutionResult();
 
+            ScheduledExecutorService clientHeartbeatService = null;
             if 
(getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
                 Thread shutdownHook =
                         ShutdownHookUtil.addShutdownHook(
@@ -112,9 +116,17 @@ public class ContextEnvironment extends 
ExecutionEnvironment {
                                         shutdownHook,
                                         
ContextEnvironment.class.getSimpleName(),
                                         LOG));
+                clientHeartbeatService =
+                        ClientUtils.reportHeartbeatPeriodically(
+                                jobClient,
+                                
getConfiguration().getLong(ClientOptions.CLIENT_HEARTBEAT_INTERVAL),
+                                
getConfiguration().getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
             }
 
             jobExecutionResult = jobExecutionResultFuture.get();
+            if (clientHeartbeatService != null) {
+                clientHeartbeatService.shutdown();
+            }
             if (!suppressSysout) {
                 System.out.println(jobExecutionResult);
             }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 80b949daf6f..121b5dd2e64 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -195,6 +195,11 @@ public class MiniClusterClient implements 
ClusterClient<MiniClusterClient.MiniCl
         return miniCluster.invalidateClusterDataset(new 
IntermediateDataSetID(clusterDatasetId));
     }
 
+    @Override
+    public CompletableFuture<Void> reportHeartbeat(JobID jobId, long 
expiredTimestamp) {
+        return miniCluster.reportHeartbeat(jobId, expiredTimestamp);
+    }
+
     /** The type of the Cluster ID for the local {@link MiniCluster}. */
     public enum MiniClusterId {
         INSTANCE
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index efe5ada5912..9e7287c2afb 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
@@ -50,6 +52,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -140,6 +143,7 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
             CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
                     jobClient.getJobExecutionResult();
 
+            ScheduledExecutorService clientHeartbeatService = null;
             if 
(configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
                 Thread shutdownHook =
                         ShutdownHookUtil.addShutdownHook(
@@ -157,9 +161,17 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                                         shutdownHook,
                                         
StreamContextEnvironment.class.getSimpleName(),
                                         LOG));
+                clientHeartbeatService =
+                        ClientUtils.reportHeartbeatPeriodically(
+                                jobClient,
+                                
configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_INTERVAL),
+                                
configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
             }
 
             jobExecutionResult = jobExecutionResultFuture.get();
+            if (clientHeartbeatService != null) {
+                clientHeartbeatService.shutdown();
+            }
             if (!suppressSysout) {
                 System.out.println(jobExecutionResult);
             }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 6ec95eba137..a05350d5e58 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -58,6 +58,9 @@ import 
org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
 import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatHeaders;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -727,6 +730,18 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                 });
     }
 
+    @Override
+    public CompletableFuture<Void> reportHeartbeat(JobID jobId, long 
expiredTimestamp) {
+        JobClientHeartbeatParameters params =
+                new JobClientHeartbeatParameters().resolveJobId(jobId);
+        CompletableFuture<EmptyResponseBody> responseFuture =
+                sendRequest(
+                        JobClientHeartbeatHeaders.getInstance(),
+                        params,
+                        new JobClientHeartbeatRequestBody(expiredTimestamp));
+        return responseFuture.thenApply(ignore -> null);
+    }
+
     @Override
     public void shutDownCluster() {
         try {
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/ClientHeartbeatTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/ClientHeartbeatTest.java
new file mode 100644
index 00000000000..25bb9fbae9b
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/ClientHeartbeatTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.PerJobMiniClusterFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.WaitingCancelableInvokable;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for client's heartbeat. */
+public class ClientHeartbeatTest {
+    private final long clientHeartbeatInterval = 50;
+    private final long clientHeartbeatTimeout = 500;
+
+    private MiniCluster miniCluster;
+
+    @AfterEach
+    void teardown() throws Exception {
+        if (miniCluster != null) {
+            miniCluster.close();
+        }
+    }
+
+    @Test
+    void testJobCancelledIfClientHeartbeatTimeout() throws Exception {
+        JobClient jobClient = submitJob(createConfiguration(true));
+
+        // The client doesn't report heartbeat to the dispatcher.
+
+        assertThat(jobClient.getJobExecutionResult())
+                .failsWithin(Duration.ofSeconds(1))
+                .withThrowableOfType(ExecutionException.class)
+                .withMessageContaining("Job was cancelled");
+
+        assertThat(miniCluster.isRunning()).isFalse();
+    }
+
+    @Test
+    void testJobRunningIfClientReportHeartbeat() throws Exception {
+        JobClient jobClient = submitJob(createConfiguration(true));
+
+        // The client reports heartbeat to the dispatcher.
+        ScheduledExecutorService heartbeatService =
+                ClientUtils.reportHeartbeatPeriodically(
+                        jobClient, clientHeartbeatInterval, 
clientHeartbeatTimeout);
+
+        Thread.sleep(2 * clientHeartbeatTimeout);
+        
assertThat(jobClient.getJobStatus().get()).isEqualTo(JobStatus.RUNNING);
+
+        heartbeatService.shutdown();
+    }
+
+    @Test
+    void testJobRunningIfDisableClientHeartbeat() throws Exception {
+        JobClient jobClient = submitJob(createConfiguration(false));
+
+        Thread.sleep(2 * clientHeartbeatTimeout);
+        
assertThat(jobClient.getJobStatus().get()).isEqualTo(JobStatus.RUNNING);
+    }
+
+    private Configuration createConfiguration(boolean shutdownOnAttachedExit) {
+        Configuration configuration = new Configuration();
+        if (shutdownOnAttachedExit) {
+            configuration.setBoolean(DeploymentOptions.ATTACHED, true);
+            configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, 
true);
+        }
+        return configuration;
+    }
+
+    // The dispatcher deals with client heartbeat only when 
shutdownOnAttachedExit is true;
+    private JobClient submitJob(Configuration configuration) throws Exception {
+        PerJobMiniClusterFactory perJobMiniClusterFactory =
+                PerJobMiniClusterFactory.createWithFactory(
+                        configuration,
+                        config -> {
+                            miniCluster = new MiniCluster(config);
+                            return miniCluster;
+                        });
+
+        JobGraph cancellableJobGraph = getCancellableJobGraph();
+        // Enable heartbeat only when both execution.attached and
+        // execution.shutdown-on-attached-exit are true.
+        if (configuration.getBoolean(DeploymentOptions.ATTACHED)
+                && 
configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
+            
cancellableJobGraph.setInitialClientHeartbeatTimeout(clientHeartbeatTimeout);
+        }
+        return perJobMiniClusterFactory
+                .submitJob(cancellableJobGraph, 
ClassLoader.getSystemClassLoader())
+                .get();
+    }
+
+    private static JobGraph getCancellableJobGraph() {
+        JobVertex jobVertex = new JobVertex("jobVertex");
+        jobVertex.setInvokableClass(WaitingCancelableInvokable.class);
+        jobVertex.setParallelism(1);
+        return JobGraphTestUtils.streamingJobGraph(jobVertex);
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 0febcbc1cab..a0411937395 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -114,4 +114,7 @@ public interface JobClient {
 
     /** Returns the {@link JobExecutionResult result of the job execution} of 
the submitted job. */
     CompletableFuture<JobExecutionResult> getJobExecutionResult();
+
+    /** The client reports the heartbeat to the dispatcher for aliveness. */
+    default void reportHeartbeat(long expiredTimestamp) {}
 }
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index de462b9f204..d4b2937e1c6 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1838,6 +1838,32 @@
         }
       }
     }
+  }, {
+    "url" : "/jobs/:jobid/clientHeartbeat",
+    "method" : "PATCH",
+    "status-code" : "202 Accepted",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobClientHeartbeatRequestBody",
+      "properties" : {
+        "expiredTimestamp" : {
+          "type" : "integer"
+        }
+      }
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
+    }
   }, {
     "url" : "/jobs/:jobid/config",
     "method" : "GET",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index cfa36bad1df..83c493a2ad4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.metrics.MetricGroup;
@@ -101,6 +102,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -108,6 +110,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -125,6 +129,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
     private static final int INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY = 16;
 
+    private static final long MAX_JOB_CLIENT_ALIVENESS_CHECK_INTERVAL = 60_000;
+
     private final Configuration configuration;
 
     private final JobGraphWriter jobGraphWriter;
@@ -170,6 +176,10 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     private final ResourceCleaner localResourceCleaner;
     private final ResourceCleaner globalResourceCleaner;
 
+    private final Time webTimeout;
+    private final Map<JobID, Long> jobClientExpiredTimestamp = new HashMap<>();
+    private ScheduledFuture<?> jobClientAlivenessCheck;
+
     /** Enum to distinguish between initial job submission and re-submission 
for recovery. */
     protected enum ExecutionType {
         SUBMISSION,
@@ -282,6 +292,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                 
resourceCleanerFactory.createLocalResourceCleaner(this.getMainThreadExecutor());
         this.globalResourceCleaner =
                 
resourceCleanerFactory.createGlobalResourceCleaner(this.getMainThreadExecutor());
+
+        this.webTimeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
     }
 
     // ------------------------------------------------------
@@ -354,6 +366,9 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
     private void runRecoveredJob(final JobGraph recoveredJob) {
         checkNotNull(recoveredJob);
+
+        initJobClientExpiredTime(recoveredJob);
+
         try {
             runJob(createJobMasterRunner(recoveredJob), 
ExecutionType.RECOVERY);
         } catch (Throwable throwable) {
@@ -365,6 +380,33 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
         }
     }
 
+    private void initJobClientExpiredTime(JobGraph jobGraph) {
+        JobID jobID = jobGraph.getJobID();
+        long initialClientHeartbeatTimeout = 
jobGraph.getInitialClientHeartbeatTimeout();
+        if (initialClientHeartbeatTimeout > 0) {
+            log.info(
+                    "Begin to detect the client's aliveness for job {}. The 
heartbeat timeout is {}",
+                    jobID,
+                    initialClientHeartbeatTimeout);
+            jobClientExpiredTimestamp.put(
+                    jobID, System.currentTimeMillis() + 
initialClientHeartbeatTimeout);
+
+            if (jobClientAlivenessCheck == null) {
+                // Use the client heartbeat timeout as the check interval.
+                jobClientAlivenessCheck =
+                        this.getRpcService()
+                                .getScheduledExecutor()
+                                .scheduleWithFixedDelay(
+                                        () ->
+                                                getMainThreadExecutor()
+                                                        
.execute(this::checkJobClientAliveness),
+                                        0L,
+                                        initialClientHeartbeatTimeout,
+                                        TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
     private void startCleanupRetries() {
         recoveredDirtyJobs.forEach(this::runCleanupRetry);
         recoveredDirtyJobs.clear();
@@ -399,6 +441,11 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     public CompletableFuture<Void> onStop() {
         log.info("Stopping dispatcher {}.", getAddress());
 
+        if (jobClientAlivenessCheck != null) {
+            jobClientAlivenessCheck.cancel(false);
+            jobClientAlivenessCheck = null;
+        }
+
         final CompletableFuture<Void> allJobsTerminationFuture =
                 terminateRunningJobsAndGetTerminationFuture();
 
@@ -570,6 +617,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
     private void persistAndRunJob(JobGraph jobGraph) throws Exception {
         jobGraphWriter.putJobGraph(jobGraph);
+        initJobClientExpiredTime(jobGraph);
         runJob(createJobMasterRunner(jobGraph), ExecutionType.SUBMISSION);
     }
 
@@ -1016,6 +1064,43 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                 operatorId, serializedRequest, timeout));
     }
 
+    @Override
+    public CompletableFuture<Void> reportJobClientHeartbeat(
+            JobID jobId, long expiredTimestamp, Time timeout) {
+        if (!getJobManagerRunner(jobId).isPresent()) {
+            log.warn("Fail to find job {} for client.", jobId);
+        } else {
+            log.debug(
+                    "Job {} receives client's heartbeat which expiredTimestamp 
is {}.",
+                    jobId,
+                    expiredTimestamp);
+            jobClientExpiredTimestamp.put(jobId, expiredTimestamp);
+        }
+        return FutureUtils.completedVoidFuture();
+    }
+
+    private void checkJobClientAliveness() {
+        long currentTimestamp = System.currentTimeMillis();
+        Iterator<Map.Entry<JobID, Long>> iterator = 
jobClientExpiredTimestamp.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<JobID, Long> entry = iterator.next();
+            JobID jobID = entry.getKey();
+            long expiredTimestamp = entry.getValue();
+
+            if (!getJobManagerRunner(jobID).isPresent()) {
+                iterator.remove();
+            } else if (expiredTimestamp <= currentTimestamp) {
+                log.warn(
+                        "The heartbeat from the job client is timeout and 
cancel the job {}. "
+                                + "You can adjust the heartbeat interval "
+                                + "by 'client.heartbeat.interval' and the 
timeout "
+                                + "by 'client.heartbeat.timeout'",
+                        jobID);
+                cancelJob(jobID, webTimeout);
+            }
+        }
+    }
+
     private void registerJobManagerRunnerTerminationFuture(
             JobID jobId, CompletableFuture<Void> 
jobManagerRunnerTerminationFuture) {
         
Preconditions.checkState(!jobManagerRunnerTerminationFutures.containsKey(jobId));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index bb821adda7b..7a739a40289 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -70,6 +70,8 @@ public class JobGraph implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    private static final String INITIAL_CLIENT_HEARTBEAT_TIMEOUT = 
"initialClientHeartbeatTimeout";
+
     // --- job and configuration ---
 
     /** List of task vertices included in this job graph. */
@@ -652,4 +654,12 @@ public class JobGraph implements Serializable {
     public List<JobStatusHook> getJobStatusHooks() {
         return this.jobStatusHooks;
     }
+
+    public void setInitialClientHeartbeatTimeout(long 
initialClientHeartbeatTimeout) {
+        jobConfiguration.setLong(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, 
initialClientHeartbeatTimeout);
+    }
+
+    public long getInitialClientHeartbeatTimeout() {
+        return jobConfiguration.getLong(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, 
Long.MIN_VALUE);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 98614e3ecc9..59240a14c03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -1341,6 +1341,13 @@ public class MiniCluster implements AutoCloseableAsync {
                                         metaInfoMap -> new 
HashSet<>(metaInfoMap.keySet())));
     }
 
+    public CompletableFuture<Void> reportHeartbeat(JobID jobId, long 
expiredTimestamp) {
+        return runDispatcherCommand(
+                dispatcherGateway ->
+                        dispatcherGateway.reportJobClientHeartbeat(
+                                jobId, expiredTimestamp, rpcTimeout));
+    }
+
     /** Internal factory for {@link RpcService}. */
     protected interface RpcServiceFactory {
         RpcService createRpcService() throws Exception;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
index 6310b62b34c..cee75957f1e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
@@ -158,6 +158,11 @@ public final class MiniClusterJobClient implements 
JobClient, CoordinationReques
         }
     }
 
+    @Override
+    public void reportHeartbeat(long expiredTimestamp) {
+        miniCluster.reportHeartbeat(jobID, expiredTimestamp);
+    }
+
     private static void shutDownCluster(MiniCluster miniCluster) {
         miniCluster
                 .closeAsync()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java
new file mode 100644
index 00000000000..2487b4bdf2c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** Receive the heartbeat from the client. */
+public class JobClientHeartbeatHandler
+        extends AbstractRestHandler<
+                RestfulGateway,
+                JobClientHeartbeatRequestBody,
+                EmptyResponseBody,
+                JobClientHeartbeatParameters> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobClientHeartbeatHandler.class);
+
+    public JobClientHeartbeatHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> headers,
+            MessageHeaders<
+                            JobClientHeartbeatRequestBody,
+                            EmptyResponseBody,
+                            JobClientHeartbeatParameters>
+                    messageHeaders) {
+        super(leaderRetriever, timeout, headers, messageHeaders);
+    }
+
+    @Override
+    public CompletableFuture<EmptyResponseBody> handleRequest(
+            HandlerRequest<JobClientHeartbeatRequestBody> request, 
RestfulGateway gateway)
+            throws RestHandlerException {
+        return gateway.reportJobClientHeartbeat(
+                        request.getPathParameter(JobIDPathParameter.class),
+                        request.getRequestBody().getExpiredTimestamp(),
+                        timeout)
+                .handle(
+                        (Void ack, Throwable error) -> {
+                            if (error != null) {
+                                String errorMessage =
+                                        "Fail to report jobClient's heartbeat: 
"
+                                                + error.getMessage();
+                                LOG.error(errorMessage, error);
+                                throw new CompletionException(
+                                        new RestHandlerException(
+                                                errorMessage,
+                                                
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                                                error));
+                            } else {
+                                return EmptyResponseBody.getInstance();
+                            }
+                        });
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatHeaders.java
new file mode 100644
index 00000000000..0bb3e7f3d94
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatHeaders.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Message headers for the {@link JobClientHeartbeatHandler}. */
+public class JobClientHeartbeatHeaders
+        implements RuntimeMessageHeaders<
+                JobClientHeartbeatRequestBody, EmptyResponseBody, 
JobClientHeartbeatParameters> {
+
+    public static final String URL = "/jobs/:jobid/clientHeartbeat";
+
+    private static final JobClientHeartbeatHeaders INSTANCE = new 
JobClientHeartbeatHeaders();
+
+    private JobClientHeartbeatHeaders() {}
+
+    @Override
+    public Class<JobClientHeartbeatRequestBody> getRequestClass() {
+        return JobClientHeartbeatRequestBody.class;
+    }
+
+    @Override
+    public Class<EmptyResponseBody> getResponseClass() {
+        return EmptyResponseBody.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.ACCEPTED;
+    }
+
+    @Override
+    public JobClientHeartbeatParameters getUnresolvedMessageParameters() {
+        return new JobClientHeartbeatParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.PATCH;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    public static JobClientHeartbeatHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Report the jobClient's aliveness.";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatParameters.java
new file mode 100644
index 00000000000..96aaa9ef492
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatParameters.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/** The parameter when the client reports heartbeat. */
+public class JobClientHeartbeatParameters extends MessageParameters {
+
+    private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+
+    @Override
+    public Collection<MessagePathParameter<?>> getPathParameters() {
+        return Collections.singleton(jobPathParameter);
+    }
+
+    @Override
+    public Collection<MessageQueryParameter<?>> getQueryParameters() {
+        return Collections.emptyList();
+    }
+
+    public JobClientHeartbeatParameters resolveJobId(JobID jobId) {
+        jobPathParameter.resolve(jobId);
+        return this;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatRequestBody.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatRequestBody.java
new file mode 100644
index 00000000000..d38cd5405c7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobClientHeartbeatRequestBody.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** {@link RequestBody} to report heartbeat for client. */
+public class JobClientHeartbeatRequestBody implements RequestBody {
+    private static final String EXPIRED_TIMESTAMP = "expiredTimestamp";
+
+    @JsonProperty(EXPIRED_TIMESTAMP)
+    private final long expiredTimestamp;
+
+    @JsonCreator
+    public JobClientHeartbeatRequestBody(long expiredTimestamp) {
+        this.expiredTimestamp = expiredTimestamp;
+    }
+
+    @JsonIgnore
+    public long getExpiredTimestamp() {
+        return expiredTimestamp;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 4f9085cda2e..4a7cf5dddcb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -271,4 +272,10 @@ public interface RestfulGateway extends RpcGateway {
             @RpcTimeout Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /** The client reports the heartbeat to the dispatcher for aliveness. */
+    default CompletableFuture<Void> reportJobClientHeartbeat(
+            JobID jobId, long expiredTimestamp, Time timeout) {
+        return FutureUtils.completedVoidFuture();
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 1ab048995aa..7836d720237 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.runtime.rest.handler.dataset.ClusterDataSetListHandler;
 import org.apache.flink.runtime.rest.handler.job.GeneratedLogUrlHandler;
 import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobCancellationHandler;
+import org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
@@ -101,6 +102,7 @@ import 
org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatHeaders;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import 
org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
@@ -692,6 +694,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                 new ShutdownHandler(
                         leaderRetriever, timeout, responseHeaders, 
ShutdownHeaders.getInstance());
 
+        final JobClientHeartbeatHandler jobClientHeartbeatHandler =
+                new JobClientHeartbeatHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JobClientHeartbeatHeaders.getInstance());
+
         final File webUiDir = restConfiguration.getWebUiDir();
 
         Optional<StaticFileServerHandler<T>> optWebContent;
@@ -878,6 +887,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
 
         handlers.add(Tuple2.of(shutdownHandler.getMessageHeaders(), 
shutdownHandler));
 
+        handlers.add(
+                Tuple2.of(
+                        jobClientHeartbeatHandler.getMessageHeaders(), 
jobClientHeartbeatHandler));
+
         optWebContent.ifPresent(
                 webContent -> {
                     handlers.add(

Reply via email to