zhuzhurk commented on code in PR #24582:
URL: https://github.com/apache/flink/pull/24582#discussion_r1590963000
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -1368,7 +1383,16 @@ private void disconnectAndTryReconnectToJobManager(
JobTable.Connection jobManagerConnection, Exception cause) {
try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobManagerConnection.getJobId()))) {
- disconnectJobManagerConnection(jobManagerConnection, cause);
+ disconnectJobManagerConnection(jobManagerConnection, cause, false);
+ jobLeaderService.reconnect(jobManagerConnection.getJobId());
+ }
+ }
+
+ private void disconnectAndTryReconnectToJobManagerPossibleRetainPartitions(
Review Comment:
I prefer to name it `disconnectAndTryReconnectToJobManager` and accept a
param `releasePartitions`.
It can be reused by the existing `disconnectAndTryReconnectToJobManager()`
method.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -1842,14 +1869,18 @@ private void closeJob(JobTable.Job job, Exception
cause) {
job.asConnection()
.ifPresent(
jobManagerConnection ->
-
disconnectJobManagerConnection(jobManagerConnection, cause));
+
disconnectJobManagerConnection(jobManagerConnection, cause, false));
job.close();
}
private void disconnectJobManagerConnection(
- JobTable.Connection jobManagerConnection, Exception cause) {
+ JobTable.Connection jobManagerConnection,
+ Exception cause,
+ boolean possibleRetainPartitions) {
Review Comment:
Maybe rename `possibleRetainPartitions` to `releasePartitions` and reverse
all the current checks and value settings.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -2474,16 +2532,19 @@ public void jobManagerLostLeadership(final JobID jobId,
final JobMasterId jobMas
"JobManager for job {} with leader id {} lost
leadership.", jobId, jobMasterId);
runAsync(
- () ->
- jobTable.getConnection(jobId)
- .ifPresent(
- jobManagerConnection ->
-
disconnectJobManagerConnection(
-
jobManagerConnection,
- new Exception(
- "Job
leader for job id "
- +
jobId
- +
" lost leadership."))));
Review Comment:
Why not just passing the param
`shouldRetainPartitionsOnJobManagerConnectionLost()` in?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -1886,6 +1917,35 @@ private void disconnectJobManagerConnection(
}
}
+ if (possibleRetainPartitions) {
+ // this branch is for job recovery
Review Comment:
job recovery -> job recovery from master failures
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -2650,11 +2711,16 @@ public void notifyTargetUnreachable(ResourceID
resourceID) {
private void handleJobManagerConnectionLoss(ResourceID resourceID,
Exception cause) {
validateRunsInMainThread();
- jobTable.getConnection(resourceID)
- .ifPresent(
- jobManagerConnection ->
- disconnectAndTryReconnectToJobManager(
- jobManagerConnection, cause));
Review Comment:
Maybe just pass in `shouldRetainPartitionsOnJobManagerConnectionLost()` as a
boolean param. See my other comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]