zhuzhurk commented on code in PR #27362:
URL: https://github.com/apache/flink/pull/27362#discussion_r2647889934
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -691,6 +716,19 @@ private CompletableFuture<Acknowledge>
internalSubmitJob(ExecutionPlan execution
executionPlan.getJobID()));
}
+ private Optional<AbstractApplication> getApplicationForJob(ExecutionPlan
executionPlan) {
Review Comment:
I guess this method should be used in `internalSubmitJob`?
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##########
@@ -211,8 +212,15 @@ private static CompletableFuture<JobID> submitJob(
throw new CompletionException(e);
}
- return dispatcherGateway.submitJob(streamGraph,
rpcTimeout);
+ return internalSubmit(dispatcherGateway,
streamGraph, rpcTimeout);
})
.thenApply(ack -> streamGraph.getJobID());
}
+
+ protected CompletableFuture<Acknowledge> internalSubmit(
Review Comment:
can be package private
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1036,7 +1086,23 @@ private CompletableFuture<ArchivedApplication>
requestApplication(
List<CompletableFuture<JobDetails>> jobDetailsFutures =
application.getJobs().stream()
- .map(jobId -> requestJobDetails(jobId, timeout))
+ .map(
+ jobId ->
+ requestJobDetails(jobId, timeout)
+ .exceptionally(
+ t -> {
+ if (t
+ instanceof
+
FlinkJobNotFoundException) {
+ log.warn(
+
"Ignore job {} which may be expired when requesting application {}",
Review Comment:
It's weird that a job is expired in the history while its hosting
application is not expired.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -728,6 +766,17 @@ private void persistAndRunJob(ExecutionPlan executionPlan)
throws Exception {
private JobManagerRunner createJobMasterRunner(ExecutionPlan
executionPlan) throws Exception {
checkState(!jobManagerRunnerRegistry.isRegistered(executionPlan.getJobID()));
+
+ JobStatusListener singleJobApplication = null;
+ Optional<AbstractApplication> optionalApplication =
+ executionPlan.getApplicationId().map(applications::get);
+ if (optionalApplication.isPresent()) {
+ AbstractApplication application = optionalApplication.get();
+ if (application instanceof JobStatusListener) {
Review Comment:
I think the clearer logic should be: if an application is a
`SingleJobApplication`, it needs be passed to the jobManager as a
JobStatusListener.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java:
##########
@@ -45,6 +46,7 @@ JobManagerRunner createJobManagerRunner(
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
Collection<FailureEnricher> failureEnrichers,
+ JobStatusListener singleJobApplication,
Review Comment:
singleJobApplication -> jobStatusListener
The handler should simple treat it as a JobStatusListener instead of a
SingleJobApplication.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -418,27 +424,31 @@ private static void assertRecoveredJobsAndDirtyJobResults(
"There should be no overlap between the recovered
ExecutionPlans and the passed dirty JobResults based on their job ID.");
}
- private void startRecoveredJobs() {
+ private void startRecoveredJobs(boolean wrapIntoApplication) {
for (ExecutionPlan recoveredJob : recoveredJobs) {
- runRecoveredJob(recoveredJob);
+ runRecoveredJob(recoveredJob, wrapIntoApplication);
}
recoveredJobs.clear();
}
- private void runRecoveredJob(final ExecutionPlan recoveredJob) {
+ private void runRecoveredJob(
+ final ExecutionPlan recoveredJob, final boolean
wrapIntoApplication) {
Review Comment:
Could you add some comments to explain the behavior around with regard to
`wrapIntoApplication`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]