xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1051748186
##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -1344,6 +1344,13 @@ public CompletableFuture<Set<AbstractID>>
listCompletedClusterDatasetIds() {
metaInfoMap -> new
HashSet<>(metaInfoMap.keySet())));
}
+ public CompletableFuture<Void> reportHeartbeat(JobID jobId, long
expiredTimestamp) {
+ return runDispatcherCommand(
+ dispatcherGateway ->
+ dispatcherGateway.reportJobClientHeartbeat(
+ jobId, expiredTimestamp, rpcTimeout));
+ }
Review Comment:
I wonder if this feature should be supported for `MiniCluster`. In local
mode, the client and the mini cluster are in the same process, thus heartbeats
should not be necessary.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Receive the heartbeat from the client. */
+public class JobClientHeartbeatHandler
+ extends AbstractRestHandler<
+ RestfulGateway,
+ JobClientHeartbeatRequestBody,
+ EmptyResponseBody,
+ JobClientHeartbeatParameters> {
+ public JobClientHeartbeatHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> headers,
+ MessageHeaders<
+ JobClientHeartbeatRequestBody,
+ EmptyResponseBody,
+ JobClientHeartbeatParameters>
+ messageHeaders) {
+ super(leaderRetriever, timeout, headers, messageHeaders);
+ }
+
+ @Override
+ public CompletableFuture<EmptyResponseBody> handleRequest(
+ HandlerRequest<JobClientHeartbeatRequestBody> request,
RestfulGateway gateway)
+ throws RestHandlerException {
+ final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+ gateway.reportJobClientHeartbeat(
+ jobId, request.getRequestBody().getExpiredTimestamp(),
timeout);
Review Comment:
The `CompletableFuture` returned from
`RestfulGateway#reportJobClientHeartbeat` is discarded. That means if anything
goes wrong, the exception will be ignored.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -603,6 +634,17 @@ private void runJob(JobManagerRunner jobManagerRunner,
ExecutionType executionTy
final JobID jobId = jobManagerRunner.getJobID();
+ if (configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
+ log.info("Begin to receive the client's heartbeat for aliveness
check.");
+ // Use client timeout from the configuration when the job is
submitting
+ // but the client hasn't reported heartbeat.
Review Comment:
This could result in jobs being canceled unexpectedly, if the interval in
the client-side configuration is larger than timeout in the cluster-side.
##########
flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java:
##########
@@ -71,6 +71,37 @@ public class HeartbeatManagerOptions {
TextElement.code("-1"))
.build());
+ /** Timeout for job client to report its heartbeat. */
+ @Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
+ public static final ConfigOption<Long> CLIENT_HEARTBEAT_TIMEOUT =
+ key("client.heartbeat.timeout")
+ .longType()
+ .defaultValue(18000L)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Cancel the job if the dispatcher
hasn't received the client's"
+ + " heartbeat after
timeout when '%s' is set true.",
+ TextElement.text(
+
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+ .build());
+
+ /** Time interval for job client to report its heartbeat. */
+ @Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
+ public static final ConfigOption<Long> CLIENT_HEARTBEAT_INTERVAL =
+ key("client.heartbeat.interval")
+ .longType()
+ .defaultValue(30000L)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Time interval for job client to
report its heartbeat "
+ + "when '%s' is set true.
Cancel the job if timeout configured by '%s'.",
+ TextElement.text(
+
DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()),
+
TextElement.text(CLIENT_HEARTBEAT_TIMEOUT.key()))
+ .build());
+
Review Comment:
I think these 2 options do not belong to `HeartbeatManagerOptions`, because
the feature does not use `HeartbeatManager`. I'd suggest to move them to
`ClientOptions`.
##########
flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java:
##########
@@ -144,4 +149,29 @@ public static void waitUntilJobInitializationFinished(
throw new RuntimeException("Error while waiting for job to be
initialized", throwable);
}
}
+
+ /**
+ * The client reports the heartbeat to the dispatcher for aliveness.
+ *
+ * @param jobClient The job client.
+ * @param interval The heartbeat interval.
+ * @param timeout The heartbeat timeout.
+ * @return The ScheduledExecutorService which reports heartbeat
periodically.
+ */
+ public static ScheduledExecutorService reportHeartbeatPeriodically(
+ JobClient jobClient, long interval, long timeout) {
Review Comment:
It would be better to check that `timeout` is larger than `interval`, and if
not fail with a descriptive error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]