eemario commented on code in PR #27686:
URL: https://github.com/apache/flink/pull/27686#discussion_r2871994461
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -334,9 +338,21 @@ protected Dispatcher(
this.dispatcherBootstrapFactory =
checkNotNull(dispatcherBootstrapFactory);
- this.recoveredJobs = new HashSet<>(recoveredJobs);
+ for (ExecutionPlan executionPlan : recoveredJobs) {
+ final JobID jobId = executionPlan.getJobID();
+ final ApplicationID applicationId =
executionPlan.getApplicationId().orElse(null);
Review Comment:
This can only happen in test cases that submit jobs without an associated
application. I’ll investigate whether there's a better way to handle them.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java:
##########
@@ -493,7 +524,9 @@ private static ArchivedExecutionGraph
createSparseArchivedExecutionGraph(
jobStatus == JobStatus.FAILED || jobStatus ==
JobStatus.SUSPENDED);
long failureTime = System.currentTimeMillis();
failureInfo = new ErrorInfo(throwable, failureTime);
- timestamps[jobStatus.ordinal()] = failureTime;
+ if (endTimestamp < 0) {
Review Comment:
The intent of this change is to use the current timestamp as failureTime
only when endTimestamp is invalid. The code and comments have been updated for
clarity.
##########
flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationITCase.java:
##########
@@ -195,10 +194,8 @@ public JobResultStore getJobResultStore() {
awaitClusterStopped(cluster);
}
-
FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException())
- .as(
- "The job's main method shouldn't have been succeeded
due to a DuplicateJobSubmissionException.")
-
.hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class);
+ // submission should succeed
Review Comment:
Updated comments to explain the change.
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##########
@@ -122,18 +145,33 @@ public CompletableFuture<JobClient> execute(
// Skip resubmission if the job is recovered via HA.
// When optJobId is present, the streamGraph's ID is deterministically
derived from it. In
- // this case, if the streamGraph's ID is in submittedJobIds, it means
the job was submitted
- // in a previous run and should not be resubmitted.
- if (optJobId.isPresent() &&
submittedJobIds.contains(streamGraph.getJobID())) {
- return getJobClientFuture(streamGraph.getJobID(),
userCodeClassloader);
+ // this case, if the streamGraph's ID is in terminalJobIds or
submittedJobIds, it means the
+ // job was submitted in a previous run and should not be resubmitted.
+ if (optJobId.isPresent()) {
+ final JobID actualJobId = streamGraph.getJobID();
+ if (terminalJobIds.contains(actualJobId)) {
+ LOG.info("Job {} reached a terminal state in a previous
execution.", actualJobId);
+ return getJobClientFuture(actualJobId, userCodeClassloader);
+ }
+
+ if (recoveredJobIds.contains(actualJobId)) {
+ final Duration timeout =
configuration.get(ClientOptions.CLIENT_TIMEOUT);
+ return dispatcherGateway
+ .recoverJob(actualJobId, timeout)
+ .thenCompose(
+ ack -> {
+ LOG.info("Job {} is recovered
successfully.", actualJobId);
+ return getJobClientFuture(actualJobId,
userCodeClassloader);
+ });
+ }
}
return submitAndGetJobClientFuture(pipeline, configuration,
userCodeClassloader);
}
private CompletableFuture<JobClient> getJobClientFuture(
final JobID jobId, final ClassLoader userCodeClassloader) {
- LOG.info("Job {} was recovered successfully.", jobId);
+ submittedJobIds.add(jobId);
Review Comment:
Updated the name.
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorServiceLoader.java:
##########
@@ -55,16 +59,21 @@ public class EmbeddedExecutorServiceLoader implements
PipelineExecutorServiceLoa
*/
public EmbeddedExecutorServiceLoader(
final Collection<JobID> submittedJobIds,
+ final Collection<JobID> recoveredJobIds,
Review Comment:
Updated variable names and comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]