xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1051748186


##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -1344,6 +1344,13 @@ public CompletableFuture<Set<AbstractID>> 
listCompletedClusterDatasetIds() {
                                         metaInfoMap -> new 
HashSet<>(metaInfoMap.keySet())));
     }
 
+    public CompletableFuture<Void> reportHeartbeat(JobID jobId, long 
expiredTimestamp) {
+        return runDispatcherCommand(
+                dispatcherGateway ->
+                        dispatcherGateway.reportJobClientHeartbeat(
+                                jobId, expiredTimestamp, rpcTimeout));
+    }

Review Comment:
   I wonder if this feature should be supported for `MiniCluster`. In local 
mode, the client and the mini cluster are in the same process, thus heartbeats 
should not be necessary.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Receive the heartbeat from the client. */
+public class JobClientHeartbeatHandler
+        extends AbstractRestHandler<
+                RestfulGateway,
+                JobClientHeartbeatRequestBody,
+                EmptyResponseBody,
+                JobClientHeartbeatParameters> {
+    public JobClientHeartbeatHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> headers,
+            MessageHeaders<
+                            JobClientHeartbeatRequestBody,
+                            EmptyResponseBody,
+                            JobClientHeartbeatParameters>
+                    messageHeaders) {
+        super(leaderRetriever, timeout, headers, messageHeaders);
+    }
+
+    @Override
+    public CompletableFuture<EmptyResponseBody> handleRequest(
+            HandlerRequest<JobClientHeartbeatRequestBody> request, 
RestfulGateway gateway)
+            throws RestHandlerException {
+        final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+        gateway.reportJobClientHeartbeat(
+                jobId, request.getRequestBody().getExpiredTimestamp(), 
timeout);

Review Comment:
   The `CompletableFuture` returned from 
`RestfulGateway#reportJobClientHeartbeat` is discarded. That means if anything 
goes wrong, the exception will be ignored.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -603,6 +634,17 @@ private void runJob(JobManagerRunner jobManagerRunner, 
ExecutionType executionTy
 
         final JobID jobId = jobManagerRunner.getJobID();
 
+        if (configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
+            log.info("Begin to receive the client's heartbeat for aliveness 
check.");
+            // Use client timeout from the configuration when the job is 
submitting
+            // but the client hasn't reported heartbeat.

Review Comment:
   This could result in jobs being canceled unexpectedly, if the interval in 
the client-side configuration is larger than timeout in the cluster-side.



##########
flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java:
##########
@@ -71,6 +71,37 @@ public class HeartbeatManagerOptions {
                                             TextElement.code("-1"))
                                     .build());
 
+    /** Timeout for job client to report its heartbeat. */
+    @Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
+    public static final ConfigOption<Long> CLIENT_HEARTBEAT_TIMEOUT =
+            key("client.heartbeat.timeout")
+                    .longType()
+                    .defaultValue(18000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Cancel the job if the dispatcher 
hasn't received the client's"
+                                                    + " heartbeat after 
timeout when '%s' is set true.",
+                                            TextElement.text(
+                                                    
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+                                    .build());
+
+    /** Time interval for job client to report its heartbeat. */
+    @Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
+    public static final ConfigOption<Long> CLIENT_HEARTBEAT_INTERVAL =
+            key("client.heartbeat.interval")
+                    .longType()
+                    .defaultValue(30000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Time interval for job client to 
report its heartbeat "
+                                                    + "when '%s' is set true. 
Cancel the job if timeout configured by '%s'.",
+                                            TextElement.text(
+                                                    
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()),
+                                            
TextElement.text(CLIENT_HEARTBEAT_TIMEOUT.key()))
+                                    .build());
+

Review Comment:
   I think these 2 options do not belong to `HeartbeatManagerOptions`, because 
the feature does not use `HeartbeatManager`. I'd suggest to move them to 
`ClientOptions`.



##########
flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java:
##########
@@ -144,4 +149,29 @@ public static void waitUntilJobInitializationFinished(
             throw new RuntimeException("Error while waiting for job to be 
initialized", throwable);
         }
     }
+
+    /**
+     * The client reports the heartbeat to the dispatcher for aliveness.
+     *
+     * @param jobClient The job client.
+     * @param interval The heartbeat interval.
+     * @param timeout The heartbeat timeout.
+     * @return The ScheduledExecutorService which reports heartbeat 
periodically.
+     */
+    public static ScheduledExecutorService reportHeartbeatPeriodically(
+            JobClient jobClient, long interval, long timeout) {

Review Comment:
   It would be better to check that `timeout` is larger than `interval`, and if 
not fail with a descriptive error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to