This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git


The following commit(s) were added to refs/heads/master by this push:
     new 13129375af better CME protection for workflows
     new a3fc47a7bf Merge branch 'master' of 
https://gitbox.apache.org/repos/asf/brooklyn-server
13129375af is described below

commit 13129375afb99a3a6e733b3cad9fa5419d2acbc0
Author: Alex Heneveld <[email protected]>
AuthorDate: Fri Sep 29 11:00:26 2023 +0100

    better CME protection for workflows
---
 .../store/WorkflowStateActiveInMemory.java         | 37 ++++++++++++++++------
 .../store/WorkflowStatePersistenceViaSensors.java  |  2 +-
 2 files changed, 29 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
index 762b26e3e8..d27a161a16 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
@@ -79,9 +79,13 @@ public class WorkflowStateActiveInMemory {
 
     public void checkpoint(WorkflowExecutionContext context) {
         // keep active workflows in memory, even if disabled
-        Map<String, WorkflowExecutionContext> entityActiveWorkflows = 
getSynchronizedForWorkflowId(context.getEntity().getId());
+        Map<String, WorkflowExecutionContext> entityActiveWorkflows = 
getForWorkflowIdWithLockButResultNeedsSynch(context.getEntity().getId());
         if (context.getStatus().expirable) {
-            if (entityActiveWorkflows!=null) 
entityActiveWorkflows.remove(context.getWorkflowId());
+            if (entityActiveWorkflows!=null) {
+                synchronized (entityActiveWorkflows) {
+                    entityActiveWorkflows.remove(context.getWorkflowId());
+                }
+            }
         } else {
             if (entityActiveWorkflows==null) {
                 synchronized (data) {
@@ -92,7 +96,9 @@ public class WorkflowStateActiveInMemory {
                     }
                 }
             }
-            entityActiveWorkflows.put(context.getWorkflowId(), context);
+            synchronized (entityActiveWorkflows) {
+                entityActiveWorkflows.put(context.getWorkflowId(), context);
+            }
         }
 
         if (lastInMemClear + GLOBAL_UPDATE_FREQUENCY < 
System.currentTimeMillis()) {
@@ -101,28 +107,41 @@ public class WorkflowStateActiveInMemory {
         }
     }
 
+    /** @deprecated since 1.1 returns a _copy_; use the method which makes 
that explicit */
     public Map<String,WorkflowExecutionContext> getWorkflows(Entity entity) {
-        return getSynchronizedForWorkflowId(entity.getId());
+        return getWorkflowsCopy(entity);
+    }
+    public MutableMap<String,WorkflowExecutionContext> getWorkflowsCopy(Entity 
entity) {
+        Map<String, WorkflowExecutionContext> entityActiveWorkflows = 
getForWorkflowIdWithLockButResultNeedsSynch(entity.getId());
+        if (entityActiveWorkflows == null) return MutableMap.of();
+        synchronized (entityActiveWorkflows) {
+            return MutableMap.copyOf(entityActiveWorkflows);
+        }
     }
 
     boolean deleteWorkflow(WorkflowExecutionContext context) {
-        Map<String, WorkflowExecutionContext> entityActiveWorkflows = 
getSynchronizedForWorkflowId(context.getEntity().getId());
+        Map<String, WorkflowExecutionContext> entityActiveWorkflows = 
getForWorkflowIdWithLockButResultNeedsSynch(context.getEntity().getId());
         if (entityActiveWorkflows!=null) {
-            return entityActiveWorkflows.remove(context.getWorkflowId()) != 
null;
+            synchronized (entityActiveWorkflows) {
+                return entityActiveWorkflows.remove(context.getWorkflowId()) 
!= null;
+            }
         }
         return false;
     }
 
-    private Map<String, WorkflowExecutionContext> 
getSynchronizedForWorkflowId(String entityId) {
+    // note: callers should subsequently sync on the returned map
+    private Map<String, WorkflowExecutionContext> 
getForWorkflowIdWithLockButResultNeedsSynch(String entityId) {
         synchronized (data) {
             return data.get(entityId);
         }
     }
 
     public WorkflowExecutionContext 
getFromTag(BrooklynTaskTags.WorkflowTaskTag tag) {
-        Map<String, WorkflowExecutionContext> activeForEntity = 
getSynchronizedForWorkflowId(tag.getEntityId());
+        Map<String, WorkflowExecutionContext> activeForEntity = 
getForWorkflowIdWithLockButResultNeedsSynch(tag.getEntityId());
         if (activeForEntity!=null) {
-            return activeForEntity.get(tag.getWorkflowId());
+            synchronized (activeForEntity) {
+                return activeForEntity.get(tag.getWorkflowId());
+            }
         }
         return null;
     }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
index 51cb28a068..63170c539b 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
@@ -168,7 +168,7 @@ public class WorkflowStatePersistenceViaSensors {
     }
 
     public Map<String,WorkflowExecutionContext> getWorkflows(Entity entity) {
-        MutableMap<String, WorkflowExecutionContext> result = 
MutableMap.copyOf(WorkflowStateActiveInMemory.get(mgmt).getWorkflows(entity));
+        MutableMap<String, WorkflowExecutionContext> result = 
WorkflowStateActiveInMemory.get(mgmt).getWorkflowsCopy(entity);
         result.add(entity.sensors().get(INTERNAL_WORKFLOWS));
         return result;
     }

Reply via email to