zhuzhurk commented on code in PR #24582:
URL: https://github.com/apache/flink/pull/24582#discussion_r1560848973
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -1886,6 +1911,33 @@ private void disconnectJobManagerConnection(
}
}
+ if (cleanupPartitionLater) {
+ // this branch is for job recovery
+ final Duration maxRegistrationDuration =
+ taskManagerConfiguration.getMaxRegistrationDuration();
+
+ if (maxRegistrationDuration != null) {
+ log.info(
+ "Waiting for {} mills for job {} to recover. If there
is no reconnection, "
Review Comment:
there is no reconnection -> the job manager is not reconnected
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -409,6 +410,12 @@ public TaskExecutor(
resourceId, new JobManagerHeartbeatListener(),
getMainThreadExecutor(), log);
}
+ private boolean isJobRecoveryEnabled() {
Review Comment:
Better to rename it to `shoudRetainPartitionsOnJobManagerConnectionLost`.
And it should return `true` only if netty shuffle is used.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -2474,16 +2523,21 @@ public void jobManagerLostLeadership(final JobID jobId,
final JobMasterId jobMas
"JobManager for job {} with leader id {} lost
leadership.", jobId, jobMasterId);
runAsync(
- () ->
- jobTable.getConnection(jobId)
- .ifPresent(
- jobManagerConnection ->
-
disconnectJobManagerConnection(
-
jobManagerConnection,
- new Exception(
- "Job
leader for job id "
- +
jobId
- +
" lost leadership."))));
+ () -> {
+ Optional<JobTable.Connection> connection =
jobTable.getConnection(jobId);
+
+ if (connection.isPresent()) {
+ Exception cause =
+ new Exception(
+ "Job leader for job id " + jobId +
" lost leadership.");
+ if (isJobRecoveryEnabled()) {
+
disconnectJobManagerConnectionAndCleanupPartitionLater(
Review Comment:
Would you confirm if it's possible to reconnect to the old JM? if it can
happen, the retained partitions will be leaked.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -1373,6 +1380,12 @@ private void disconnectAndTryReconnectToJobManager(
}
}
+ private void disconnectAndTryReconnectToJobManagerAndCleanupPartitionLater(
+ JobTable.Connection jobManagerConnection, Exception cause) {
+
disconnectJobManagerConnectionAndCleanupPartitionLater(jobManagerConnection,
cause);
Review Comment:
I prefer to name it as `disconnectAndTryReconnectToJobManager` and add
comments to explain that it does not cleanup partitions right now. Only if the
reconnection cannot be done will the partitions get cleared.
It is natural in the case to support JM reconnection so we do not need add
all the details to the method name. And `CleanupPartitionLater` may not happen.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]