wmedvede commented on code in PR #2214:
URL:
https://github.com/apache/incubator-kie-kogito-apps/pull/2214#discussion_r2093553471
##########
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java:
##########
@@ -45,47 +38,56 @@ public class DelegateJob implements Job<JobDetailsContext> {
private static final Logger LOGGER =
LoggerFactory.getLogger(DelegateJob.class);
- private final JobExecutorResolver jobExecutorResolver;
+ private JobExecutorResolver jobExecutorResolver;
- ReactiveJobScheduler scheduler;
+ private JobScheduler<JobDetails> scheduler;
- public DelegateJob(JobExecutorResolver executorResolver,
ReactiveJobScheduler scheduler) {
+ public DelegateJob(JobExecutorResolver executorResolver,
JobScheduler<JobDetails> scheduler) {
this.jobExecutorResolver = executorResolver;
this.scheduler = scheduler;
}
@Override
public void execute(JobDetailsContext ctx) {
- final AtomicReference<JobExecutionResponse> executionResponse = new
AtomicReference<>();
- final JobDetails jobDetails = requireNonNull(ctx.getJobDetails(), ()
-> String.format("JobDetails cannot be null for context: %s", ctx));
- final JobExecutor executor =
requireNonNull(jobExecutorResolver.get(jobDetails), () -> String.format("No
JobExecutor was found for jobDetails: %s", jobDetails));
- LOGGER.info("Executing job for context: {}", jobDetails);
- executor.execute(jobDetails)
- .flatMap(response -> {
- executionResponse.set(response);
- return handleJobExecutionSuccess(response);
- })
- .onFailure(JobExecutionException.class).recoverWithUni(ex -> {
- String jobId = ((JobExecutionException) ex).getJobId();
- executionResponse.set(JobExecutionResponse.builder()
- .message(ex.getMessage())
- .now()
- .jobId(jobId)
- .build());
- return handleJobExecutionError(executionResponse.get());
- })
- // avoid blocking IO pool from the event-loop since
alternative EmbeddedJobExecutor is blocking.
- .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
- .subscribe().with(ignore -> LOGGER.info("Job execution
response processing has finished: {}", executionResponse.get()));
+ JobDetails jobDetails = requireNonNull(ctx.getJobDetails(), () ->
String.format("JobDetails cannot be null for context: %s", ctx));
+ try {
+ JobExecutor executor = jobExecutorResolver.get(jobDetails);
+ executor = requireNonNull(executor, () -> String.format("No
JobExecutor was found for jobDetails: %s", jobDetails));
+ LOGGER.trace("Executing job for context: {}", jobDetails);
+ JobExecutionResponse response = executor.execute(jobDetails);
+ LOGGER.trace("Job execution response processing has finished: {}",
response);
+ handleJobExecutionSuccess(response);
+ } catch (JobExecutionException ex) {
+ LOGGER.error("Executing job error: {}", ex.getMessage());
+ JobExecutionResponse errorResponse = JobExecutionResponse.builder()
+ .message(ex.getMessage())
+ .now()
+ .jobId(jobDetails.getId())
+ .code("500")
+ .build();
+
+ handleJobExecutionError(errorResponse);
+ } catch (Exception ex) {
+ LOGGER.error("Unexpected error during the job execution: {}",
ex.getMessage());
Review Comment:
I think no exceptions are thrown here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]