This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
The following commit(s) were added to refs/heads/master by this push:
new 13129375af better CME protection for workflows
new a3fc47a7bf Merge branch 'master' of
https://gitbox.apache.org/repos/asf/brooklyn-server
13129375af is described below
commit 13129375afb99a3a6e733b3cad9fa5419d2acbc0
Author: Alex Heneveld <[email protected]>
AuthorDate: Fri Sep 29 11:00:26 2023 +0100
better CME protection for workflows
---
.../store/WorkflowStateActiveInMemory.java | 37 ++++++++++++++++------
.../store/WorkflowStatePersistenceViaSensors.java | 2 +-
2 files changed, 29 insertions(+), 10 deletions(-)
diff --git
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
index 762b26e3e8..d27a161a16 100644
---
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
+++
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
@@ -79,9 +79,13 @@ public class WorkflowStateActiveInMemory {
public void checkpoint(WorkflowExecutionContext context) {
// keep active workflows in memory, even if disabled
- Map<String, WorkflowExecutionContext> entityActiveWorkflows =
getSynchronizedForWorkflowId(context.getEntity().getId());
+ Map<String, WorkflowExecutionContext> entityActiveWorkflows =
getForWorkflowIdWithLockButResultNeedsSynch(context.getEntity().getId());
if (context.getStatus().expirable) {
- if (entityActiveWorkflows!=null)
entityActiveWorkflows.remove(context.getWorkflowId());
+ if (entityActiveWorkflows!=null) {
+ synchronized (entityActiveWorkflows) {
+ entityActiveWorkflows.remove(context.getWorkflowId());
+ }
+ }
} else {
if (entityActiveWorkflows==null) {
synchronized (data) {
@@ -92,7 +96,9 @@ public class WorkflowStateActiveInMemory {
}
}
}
- entityActiveWorkflows.put(context.getWorkflowId(), context);
+ synchronized (entityActiveWorkflows) {
+ entityActiveWorkflows.put(context.getWorkflowId(), context);
+ }
}
if (lastInMemClear + GLOBAL_UPDATE_FREQUENCY <
System.currentTimeMillis()) {
@@ -101,28 +107,41 @@ public class WorkflowStateActiveInMemory {
}
}
+ /** @deprecated since 1.1 returns a _copy_; use the method which makes
that explicit */
public Map<String,WorkflowExecutionContext> getWorkflows(Entity entity) {
- return getSynchronizedForWorkflowId(entity.getId());
+ return getWorkflowsCopy(entity);
+ }
+ public MutableMap<String,WorkflowExecutionContext> getWorkflowsCopy(Entity
entity) {
+ Map<String, WorkflowExecutionContext> entityActiveWorkflows =
getForWorkflowIdWithLockButResultNeedsSynch(entity.getId());
+ if (entityActiveWorkflows == null) return MutableMap.of();
+ synchronized (entityActiveWorkflows) {
+ return MutableMap.copyOf(entityActiveWorkflows);
+ }
}
boolean deleteWorkflow(WorkflowExecutionContext context) {
- Map<String, WorkflowExecutionContext> entityActiveWorkflows =
getSynchronizedForWorkflowId(context.getEntity().getId());
+ Map<String, WorkflowExecutionContext> entityActiveWorkflows =
getForWorkflowIdWithLockButResultNeedsSynch(context.getEntity().getId());
if (entityActiveWorkflows!=null) {
- return entityActiveWorkflows.remove(context.getWorkflowId()) !=
null;
+ synchronized (entityActiveWorkflows) {
+ return entityActiveWorkflows.remove(context.getWorkflowId())
!= null;
+ }
}
return false;
}
- private Map<String, WorkflowExecutionContext>
getSynchronizedForWorkflowId(String entityId) {
+ // note: callers should subsequently sync on the returned map
+ private Map<String, WorkflowExecutionContext>
getForWorkflowIdWithLockButResultNeedsSynch(String entityId) {
synchronized (data) {
return data.get(entityId);
}
}
public WorkflowExecutionContext
getFromTag(BrooklynTaskTags.WorkflowTaskTag tag) {
- Map<String, WorkflowExecutionContext> activeForEntity =
getSynchronizedForWorkflowId(tag.getEntityId());
+ Map<String, WorkflowExecutionContext> activeForEntity =
getForWorkflowIdWithLockButResultNeedsSynch(tag.getEntityId());
if (activeForEntity!=null) {
- return activeForEntity.get(tag.getWorkflowId());
+ synchronized (activeForEntity) {
+ return activeForEntity.get(tag.getWorkflowId());
+ }
}
return null;
}
diff --git
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
index 51cb28a068..63170c539b 100644
---
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
+++
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
@@ -168,7 +168,7 @@ public class WorkflowStatePersistenceViaSensors {
}
public Map<String,WorkflowExecutionContext> getWorkflows(Entity entity) {
- MutableMap<String, WorkflowExecutionContext> result =
MutableMap.copyOf(WorkflowStateActiveInMemory.get(mgmt).getWorkflows(entity));
+ MutableMap<String, WorkflowExecutionContext> result =
WorkflowStateActiveInMemory.get(mgmt).getWorkflowsCopy(entity);
result.add(entity.sensors().get(INTERNAL_WORKFLOWS));
return result;
}