Copilot commented on code in PR #12971:
URL: https://github.com/apache/cloudstack/pull/12971#discussion_r3044507544
##########
framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java:
##########
@@ -184,6 +184,7 @@ public class AsyncJobManagerImpl extends ManagerBase
implements AsyncJobManager,
private volatile long _executionRunNumber = 1;
private final ScheduledExecutorService _heartbeatScheduler =
Executors.newScheduledThreadPool(1, new
NamedThreadFactory("AsyncJobMgr-Heartbeat"));
+ private final ExecutorService _eventBusPublisher =
Executors.newSingleThreadExecutor(new
NamedThreadFactory("AsyncJobMgr-EventBus"));
private ExecutorService _apiJobExecutor;
Review Comment:
`Executors.newSingleThreadExecutor(...)` uses an unbounded
LinkedBlockingQueue. If `_messageBus.publish()` blocks (the original issue) for
extended periods, this queue can grow without bound and increase memory
pressure or OOM under sustained load. Consider using a bounded
`ThreadPoolExecutor` (still single-threaded) with an explicit queue size and a
rejection policy (e.g., log+drop or CallerRuns as a backpressure mechanism)
that matches desired guarantees for job event delivery.
##########
framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java:
##########
@@ -1378,6 +1379,7 @@ public boolean start() {
@Override
public boolean stop() {
_heartbeatScheduler.shutdown();
+ _eventBusPublisher.shutdown();
_apiJobExecutor.shutdown();
_workerJobExecutor.shutdown();
Review Comment:
`stop()` only calls `_eventBusPublisher.shutdown()` and immediately returns.
Because `NamedThreadFactory` creates non-daemon threads, a slow/stuck publish
can keep the JVM alive longer than expected during shutdown, and queued events
may continue to run while other components are already stopping. Consider
awaiting termination with a timeout and then `shutdownNow()` (with logging) to
ensure predictable shutdown behavior.
##########
framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java:
##########
@@ -1397,8 +1399,17 @@ protected AsyncJobManagerImpl() {
}
private void publishOnEventBus(AsyncJob job, String jobEvent) {
- _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH,
PublishScope.LOCAL,
- new Pair<AsyncJob, String>(job, jobEvent));
+ try {
+ _eventBusPublisher.submit(new ManagedContextRunnable() {
+ @Override
+ protected void runInContext() {
+ _messageBus.publish(null,
AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
+ new Pair<AsyncJob, String>(job, jobEvent));
Review Comment:
Exceptions thrown by `_messageBus.publish(...)` inside the submitted task
will be captured by the executor and effectively ignored because the returned
`Future` is not observed. This can silently drop job events while the API call
continues normally. Consider wrapping the publish call in a try/catch (at least
`Exception`, ideally `Throwable`) within `runInContext()` and logging failures
(optionally including jobId/jobEvent) so operational issues are visible.
```suggestion
try {
_messageBus.publish(null,
AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<AsyncJob, String>(job, jobEvent));
} catch (Throwable t) {
logger.warn("Failed to publish async job event on
message bus. jobId={}, jobEvent={}",
job != null ? job.getId() : null, jobEvent, t);
}
```
##########
framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java:
##########
@@ -1397,8 +1399,17 @@ protected AsyncJobManagerImpl() {
}
private void publishOnEventBus(AsyncJob job, String jobEvent) {
- _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH,
PublishScope.LOCAL,
- new Pair<AsyncJob, String>(job, jobEvent));
+ try {
+ _eventBusPublisher.submit(new ManagedContextRunnable() {
+ @Override
+ protected void runInContext() {
+ _messageBus.publish(null,
AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
+ new Pair<AsyncJob, String>(job, jobEvent));
+ }
+ });
Review Comment:
`publishOnEventBus` now captures and publishes the live `AsyncJob` object
asynchronously. Callers frequently mutate the same `AsyncJobVO` instance after
calling `publishOnEventBus` (e.g., `updateAsyncJobStatus` mutates `job` inside
the following transaction), so subscribers may observe a job state that no
longer corresponds to the `jobEvent` being emitted (notably the "submit" event
can be delivered with a job already in-progress or completed). To preserve
event semantics, publish an immutable snapshot (e.g., jobId + jobEvent +
selected fields copied at enqueue time, or re-fetch a fresh immutable view
inside the publisher thread).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]