xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1036742632
##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java:
##########
@@ -271,4 +271,9 @@ default CompletableFuture<CoordinationResponse>
deliverCoordinationRequestToCoor
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
+
+ /** The client reports the heartbeat to the dispatcher for aliveness. */
+ default CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time
timeout) {
+ return CompletableFuture.completedFuture(null);
Review Comment:
Use `FutureUtils.completedVoidFuture()` to reduce object creation.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1016,6 +1034,47 @@ public CompletableFuture<CoordinationResponse>
deliverCoordinationRequestToCoord
operatorId, serializedRequest, timeout));
}
+ @Override
+ public CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time
timeout) {
+ if (!getJobManagerRunner(jobId).isPresent()) {
+ log.warn("Fail to find job {} for client.", jobId);
+ } else {
+ log.debug("Job {} receives client's heartbeat.", jobId);
+ markJobClientAliveness(jobId);
+ resetJobClientAlivenessCheck(jobId);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private void markJobClientAliveness(JobID jobId) {
+ if (jobClientAlivenessFutures.containsKey(jobId)) {
+ jobClientAlivenessFutures.get(jobId).complete(null);
+ }
+ }
+
+ private void resetJobClientAlivenessCheck(JobID jobId) {
+ CompletableFuture<Void> clientAlivenessFuture = new
CompletableFuture<>();
+ jobClientAlivenessFutures.put(jobId, clientAlivenessFuture);
+ FutureUtils.orTimeout(
+ clientAlivenessFuture,
+ jobClientHeartbeatTimeout,
+ TimeUnit.MILLISECONDS,
+ getMainThreadExecutor());
+ clientAlivenessFuture.whenComplete(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof TimeoutException) {
+ log.warn(
+ "Haven't receive aliveness from job client
and cancel the job {}.",
+ jobId);
+ cancelJob(jobId, webTimeout);
Review Comment:
The future based approach seems to be an overkill for this feature. For each
job and on each heartbeat, it creates a new feature, which seems to be
expensive.
I'd suggest to simply record the last heartbeat timestamp of each job, and
periodically check whether there's any job that has timed out. This is similar
to how we check and release idle TMs. It may not trigger a timeout precisely on
the configured time, but would be much cheaper.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1016,6 +1034,47 @@ public CompletableFuture<CoordinationResponse>
deliverCoordinationRequestToCoord
operatorId, serializedRequest, timeout));
}
+ @Override
+ public CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time
timeout) {
+ if (!getJobManagerRunner(jobId).isPresent()) {
+ log.warn("Fail to find job {} for client.", jobId);
+ } else {
+ log.debug("Job {} receives client's heartbeat.", jobId);
+ markJobClientAliveness(jobId);
+ resetJobClientAlivenessCheck(jobId);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private void markJobClientAliveness(JobID jobId) {
+ if (jobClientAlivenessFutures.containsKey(jobId)) {
+ jobClientAlivenessFutures.get(jobId).complete(null);
+ }
+ }
+
+ private void resetJobClientAlivenessCheck(JobID jobId) {
+ CompletableFuture<Void> clientAlivenessFuture = new
CompletableFuture<>();
+ jobClientAlivenessFutures.put(jobId, clientAlivenessFuture);
+ FutureUtils.orTimeout(
+ clientAlivenessFuture,
+ jobClientHeartbeatTimeout,
+ TimeUnit.MILLISECONDS,
+ getMainThreadExecutor());
+ clientAlivenessFuture.whenComplete(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof TimeoutException) {
+ log.warn(
+ "Haven't receive aliveness from job client
and cancel the job {}.",
+ jobId);
+ cancelJob(jobId, webTimeout);
Review Comment:
What happens if there's something wrong in canceling the job? E.g., a
timeout.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]