zhuzhurk commented on code in PR #27686:
URL: https://github.com/apache/flink/pull/27686#discussion_r2867182640
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -334,9 +338,21 @@ protected Dispatcher(
this.dispatcherBootstrapFactory =
checkNotNull(dispatcherBootstrapFactory);
- this.recoveredJobs = new HashSet<>(recoveredJobs);
+ for (ExecutionPlan executionPlan : recoveredJobs) {
+ final JobID jobId = executionPlan.getJobID();
+ final ApplicationID applicationId =
executionPlan.getApplicationId().orElse(null);
Review Comment:
In what case the applicationId can be null?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -664,11 +713,16 @@ private CompletableFuture<Acknowledge>
internalSubmitApplication(
log.info("Submitting application '{}' ({}).", application.getName(),
applicationId);
applications.put(applicationId, application);
- Set<JobID> jobs = recoveredApplicationJobIds.remove(applicationId);
- if (jobs != null) {
- jobs.forEach(application::addJob);
- }
application.registerStatusListener(this);
+ applicationTerminationFutures.put(applicationId, new
CompletableFuture<>());
+
+ // cleanup dirty job results for the application
Review Comment:
Could you explain why it does the cleanup when submitting an application?
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java:
##########
@@ -573,12 +584,12 @@ private void runApplicationEntryPoint(
getApplicationId(),
getAllRecoveredJobInfos());
- if (applicationJobIds.isEmpty()) {
+ if (submittedJobIds.isEmpty()) {
Review Comment:
IIUC, all jobs, including the newly submitted ones, recovered ones and
terminal ones will all be added into this list? If so, the old name
`applicationJobIds` may be more accurate.
And it's better to add some comments for it, like what the list would be
like after calling the `executeProgram(...)`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java:
##########
@@ -493,7 +524,9 @@ private static ArchivedExecutionGraph
createSparseArchivedExecutionGraph(
jobStatus == JobStatus.FAILED || jobStatus ==
JobStatus.SUSPENDED);
long failureTime = System.currentTimeMillis();
failureInfo = new ErrorInfo(throwable, failureTime);
- timestamps[jobStatus.ordinal()] = failureTime;
+ if (endTimestamp < 0) {
Review Comment:
Why does it assume the endTimestamp of a failed job to be less than 0?
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##########
@@ -122,18 +145,33 @@ public CompletableFuture<JobClient> execute(
// Skip resubmission if the job is recovered via HA.
// When optJobId is present, the streamGraph's ID is deterministically
derived from it. In
- // this case, if the streamGraph's ID is in submittedJobIds, it means
the job was submitted
- // in a previous run and should not be resubmitted.
- if (optJobId.isPresent() &&
submittedJobIds.contains(streamGraph.getJobID())) {
- return getJobClientFuture(streamGraph.getJobID(),
userCodeClassloader);
+ // this case, if the streamGraph's ID is in terminalJobIds or
submittedJobIds, it means the
+ // job was submitted in a previous run and should not be resubmitted.
+ if (optJobId.isPresent()) {
+ final JobID actualJobId = streamGraph.getJobID();
+ if (terminalJobIds.contains(actualJobId)) {
+ LOG.info("Job {} reached a terminal state in a previous
execution.", actualJobId);
+ return getJobClientFuture(actualJobId, userCodeClassloader);
+ }
+
+ if (recoveredJobIds.contains(actualJobId)) {
+ final Duration timeout =
configuration.get(ClientOptions.CLIENT_TIMEOUT);
+ return dispatcherGateway
+ .recoverJob(actualJobId, timeout)
+ .thenCompose(
+ ack -> {
+ LOG.info("Job {} is recovered
successfully.", actualJobId);
+ return getJobClientFuture(actualJobId,
userCodeClassloader);
+ });
+ }
}
return submitAndGetJobClientFuture(pipeline, configuration,
userCodeClassloader);
}
private CompletableFuture<JobClient> getJobClientFuture(
final JobID jobId, final ClassLoader userCodeClassloader) {
- LOG.info("Job {} was recovered successfully.", jobId);
+ submittedJobIds.add(jobId);
Review Comment:
The methos should be renamed to reflect its new behavior.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -624,6 +648,31 @@ public CompletableFuture<Acknowledge>
submitJob(ExecutionPlan executionPlan, Dur
getMainThreadExecutor(jobID));
}
+ @Override
+ public CompletableFuture<Acknowledge> recoverJob(JobID jobId, Duration
timeout) {
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+ log.info("Received job recovery request for job {}.", jobId);
+ }
+ final ExecutionPlan executionPlan = recoveredJobs.remove(jobId);
+ if (executionPlan == null) {
+ return FutureUtils.completedExceptionally(
+ new JobSubmissionException(jobId, "Cannot find the
recovered job."));
+ }
+
+ final ApplicationID applicationId =
executionPlan.getApplicationId().orElse(null);
+ checkState(recoveredJobIdsByApplicationId.containsKey(applicationId));
+
+ runRecoveredJob(executionPlan, false);
+
+ Set<JobID> jobIds = recoveredJobIdsByApplicationId.get(applicationId);
Review Comment:
I think the name `recoveredJobIdsByApplicationId` is a bit misleading. These
jobs are not `recovered` but are `suspended in previous application attempt`.
Some of them will be recovered while some of them will just be deprecated.
Could you refine the name so that it can be easier for understanding?
##########
flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationITCase.java:
##########
@@ -195,10 +194,8 @@ public JobResultStore getJobResultStore() {
awaitClusterStopped(cluster);
}
-
FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException())
- .as(
- "The job's main method shouldn't have been succeeded
due to a DuplicateJobSubmissionException.")
-
.hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class);
+ // submission should succeed
Review Comment:
Could you add more comments to explain why it will succeed?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -685,12 +739,35 @@ public void notifyApplicationStatusChange(
"Application %s does not exist.",
applicationId);
checkState(
- !applicationArchivingFutures.containsKey(applicationId),
- "The application (" + applicationId + ") has already been
archived.");
+ !applicationTerminationFutures.get(applicationId).isDone(),
+ "The application (" + applicationId + ") has already
terminated.");
+
+ AbstractApplication application = applications.get(applicationId);
+ Set<JobID> remainingRecoveredJobIds =
Review Comment:
The name should be refined, as mentioned in another comment.
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorServiceLoader.java:
##########
@@ -55,16 +59,21 @@ public class EmbeddedExecutorServiceLoader implements
PipelineExecutorServiceLoa
*/
public EmbeddedExecutorServiceLoader(
final Collection<JobID> submittedJobIds,
+ final Collection<JobID> recoveredJobIds,
Review Comment:
The comments need to be updated. IIUC, unlike `submittedJobIds`,
`recoveredJobIds` and `terminalJobIds` are not expected to be modified by the
executor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]