danielhumanmod commented on code in PR #1478:
URL:
https://github.com/apache/datafusion-ballista/pull/1478#discussion_r2868390137
##########
ballista/scheduler/src/cluster/memory.rs:
##########
@@ -450,12 +472,23 @@ impl JobState for InMemoryJobState {
status.status,
Some(Status::Successful(_)) | Some(Status::Failed(_))
) {
+ if let Some((_, job_info)) = self.running_jobs.remove(job_id) {
+ job_info.update_subscribers(status.clone()).await;
+ }
+
self.completed_jobs
.insert(job_id.to_string(), (status.clone(),
Some(graph.cloned())));
- self.running_jobs.remove(job_id);
} else {
// otherwise update running job
- self.running_jobs.insert(job_id.to_string(), status.clone());
+ if let Some(mut job_info) = self.running_jobs.get_mut(job_id) {
+ job_info.update_subscribers(status.clone()).await;
Review Comment:
This code awaits while holding a DashMap shard lock (get_mut() guard). Since
await may suspend the future, this might potentially block other accesses to
the same shard. Should we extract the subscriber, release the lock, and then
call update_subscribers.await?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]