martinweiler commented on code in PR #2214:
URL: 
https://github.com/apache/incubator-kie-kogito-apps/pull/2214#discussion_r2093564906


##########
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java:
##########
@@ -45,47 +38,56 @@ public class DelegateJob implements Job<JobDetailsContext> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DelegateJob.class);
 
-    private final JobExecutorResolver jobExecutorResolver;
+    private JobExecutorResolver jobExecutorResolver;
 
-    ReactiveJobScheduler scheduler;
+    private JobScheduler<JobDetails> scheduler;
 
-    public DelegateJob(JobExecutorResolver executorResolver, 
ReactiveJobScheduler scheduler) {
+    public DelegateJob(JobExecutorResolver executorResolver, 
JobScheduler<JobDetails> scheduler) {
         this.jobExecutorResolver = executorResolver;
         this.scheduler = scheduler;
     }
 
     @Override
     public void execute(JobDetailsContext ctx) {
-        final AtomicReference<JobExecutionResponse> executionResponse = new 
AtomicReference<>();
-        final JobDetails jobDetails = requireNonNull(ctx.getJobDetails(), () 
-> String.format("JobDetails cannot be null for context: %s", ctx));
-        final JobExecutor executor = 
requireNonNull(jobExecutorResolver.get(jobDetails), () -> String.format("No 
JobExecutor was found for jobDetails: %s", jobDetails));
-        LOGGER.info("Executing job for context: {}", jobDetails);
-        executor.execute(jobDetails)
-                .flatMap(response -> {
-                    executionResponse.set(response);
-                    return handleJobExecutionSuccess(response);
-                })
-                .onFailure(JobExecutionException.class).recoverWithUni(ex -> {
-                    String jobId = ((JobExecutionException) ex).getJobId();
-                    executionResponse.set(JobExecutionResponse.builder()
-                            .message(ex.getMessage())
-                            .now()
-                            .jobId(jobId)
-                            .build());
-                    return handleJobExecutionError(executionResponse.get());
-                })
-                // avoid blocking IO pool from the event-loop since 
alternative EmbeddedJobExecutor is blocking.
-                .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
-                .subscribe().with(ignore -> LOGGER.info("Job execution 
response processing has finished: {}", executionResponse.get()));
+        JobDetails jobDetails = requireNonNull(ctx.getJobDetails(), () -> 
String.format("JobDetails cannot be null for context: %s", ctx));
+        try {
+            JobExecutor executor = jobExecutorResolver.get(jobDetails);
+            executor = requireNonNull(executor, () -> String.format("No 
JobExecutor was found for jobDetails: %s", jobDetails));
+            LOGGER.trace("Executing job for context: {}", jobDetails);
+            JobExecutionResponse response = executor.execute(jobDetails);
+            LOGGER.trace("Job execution response processing has finished: {}", 
response);
+            handleJobExecutionSuccess(response);
+        } catch (JobExecutionException ex) {
+            LOGGER.error("Executing job error: {}", ex.getMessage());
+            JobExecutionResponse errorResponse = JobExecutionResponse.builder()
+                    .message(ex.getMessage())
+                    .now()
+                    .jobId(jobDetails.getId())
+                    .code("500")
+                    .build();
+
+            handleJobExecutionError(errorResponse);
+        } catch (Exception ex) {
+            LOGGER.error("Unexpected error during the job execution: {}", 
ex.getMessage());
+            JobExecutionResponse errorResponse = JobExecutionResponse.builder()
+                    .message(ex.getMessage())
+                    .now()
+                    .jobId(jobDetails.getId())
+                    .code("500")
+                    .build();
+
+            handleJobExecutionError(errorResponse);
+        }
+
     }
 
-    public Uni<JobDetails> handleJobExecutionSuccess(JobExecutionResponse 
response) {
-        LOGGER.debug("Job execution success response received: {}", response);
-        return 
Uni.createFrom().publisher(publisher(ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionSuccess,
 response).buildRs()));
+    public JobDetails handleJobExecutionSuccess(JobExecutionResponse response) 
{
+        LOGGER.info("Job execution success response received: {}", response);

Review Comment:
   Please change back to `debug` to avoid too much logging at runtime



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to