Copilot commented on code in PR #12971:
URL: https://github.com/apache/cloudstack/pull/12971#discussion_r3044507544


##########
framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java:
##########
@@ -184,6 +184,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
     private volatile long _executionRunNumber = 1;
 
     private final ScheduledExecutorService _heartbeatScheduler = 
Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("AsyncJobMgr-Heartbeat"));
+    private final ExecutorService _eventBusPublisher = 
Executors.newSingleThreadExecutor(new 
NamedThreadFactory("AsyncJobMgr-EventBus"));
     private ExecutorService _apiJobExecutor;

Review Comment:
   `Executors.newSingleThreadExecutor(...)` uses an unbounded 
LinkedBlockingQueue. If `_messageBus.publish()` blocks (the original issue) for 
extended periods, this queue can grow without bound and increase memory 
pressure or OOM under sustained load. Consider using a bounded 
`ThreadPoolExecutor` (still single-threaded) with an explicit queue size and a 
rejection policy (e.g., log+drop or CallerRuns as a backpressure mechanism) 
that matches desired guarantees for job event delivery.



##########
framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java:
##########
@@ -1378,6 +1379,7 @@ public boolean start() {
     @Override
     public boolean stop() {
         _heartbeatScheduler.shutdown();
+        _eventBusPublisher.shutdown();
         _apiJobExecutor.shutdown();
         _workerJobExecutor.shutdown();

Review Comment:
   `stop()` only calls `_eventBusPublisher.shutdown()` and immediately returns. 
Because `NamedThreadFactory` creates non-daemon threads, a slow/stuck publish 
can keep the JVM alive longer than expected during shutdown, and queued events 
may continue to run while other components are already stopping. Consider 
awaiting termination with a timeout and then `shutdownNow()` (with logging) to 
ensure predictable shutdown behavior.



##########
framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java:
##########
@@ -1397,8 +1399,17 @@ protected AsyncJobManagerImpl() {
     }
 
     private void publishOnEventBus(AsyncJob job, String jobEvent) {
-        _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, 
PublishScope.LOCAL,
-            new Pair<AsyncJob, String>(job, jobEvent));
+        try {
+            _eventBusPublisher.submit(new ManagedContextRunnable() {
+                @Override
+                protected void runInContext() {
+                    _messageBus.publish(null, 
AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
+                        new Pair<AsyncJob, String>(job, jobEvent));

Review Comment:
   Exceptions thrown by `_messageBus.publish(...)` inside the submitted task 
will be captured by the executor and effectively ignored because the returned 
`Future` is not observed. This can silently drop job events while the API call 
continues normally. Consider wrapping the publish call in a try/catch (at least 
`Exception`, ideally `Throwable`) within `runInContext()` and logging failures 
(optionally including jobId/jobEvent) so operational issues are visible.
   ```suggestion
                       try {
                           _messageBus.publish(null, 
AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
                               new Pair<AsyncJob, String>(job, jobEvent));
                       } catch (Throwable t) {
                           logger.warn("Failed to publish async job event on 
message bus. jobId={}, jobEvent={}",
                               job != null ? job.getId() : null, jobEvent, t);
                       }
   ```



##########
framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java:
##########
@@ -1397,8 +1399,17 @@ protected AsyncJobManagerImpl() {
     }
 
     private void publishOnEventBus(AsyncJob job, String jobEvent) {
-        _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, 
PublishScope.LOCAL,
-            new Pair<AsyncJob, String>(job, jobEvent));
+        try {
+            _eventBusPublisher.submit(new ManagedContextRunnable() {
+                @Override
+                protected void runInContext() {
+                    _messageBus.publish(null, 
AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
+                        new Pair<AsyncJob, String>(job, jobEvent));
+                }
+            });

Review Comment:
   `publishOnEventBus` now captures and publishes the live `AsyncJob` object 
asynchronously. Callers frequently mutate the same `AsyncJobVO` instance after 
calling `publishOnEventBus` (e.g., `updateAsyncJobStatus` mutates `job` inside 
the following transaction), so subscribers may observe a job state that no 
longer corresponds to the `jobEvent` being emitted (notably the "submit" event 
can be delivered with a job already in-progress or completed). To preserve 
event semantics, publish an immutable snapshot (e.g., jobId + jobEvent + 
selected fields copied at enqueue time, or re-fetch a fresh immutable view 
inside the publisher thread).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to