This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit cf5937fe3031ecd0a4d877fc31ea8b94aa3f4f3c
Author: Alex Heneveld <[email protected]>
AuthorDate: Tue Aug 16 11:50:51 2022 +0100

    prevent GC from looping waiting for non-deleteable tasks, and more efficient
    
    by looking up tasks that won't be deleteable anyway
---
 .../mgmt/internal/BrooklynGarbageCollector.java    | 41 ++++++++--
 .../util/core/task/BasicExecutionManager.java      | 90 ++++++++++++++--------
 .../org/apache/brooklyn/util/core/task/Tasks.java  | 15 +++-
 3 files changed, 107 insertions(+), 39 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
index f66823b174..5c3b60dc79 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
@@ -308,6 +308,10 @@ public class BrooklynGarbageCollector {
         if (!task.isDone(true)) {
             return false;
         }
+        if (Tasks.isChildOfSubmitter(task, executionManager::getTask)) {
+            // we don't delete children until the parent is deleted; 
short-circuit the consideration of this
+            return false;
+        }
         
         Set<Object> tags = BrooklynTaskTags.getTagsFast(task);
         if (tags.contains(ManagementContextInternal.TRANSIENT_TASK_TAG))
@@ -418,16 +422,35 @@ public class BrooklynGarbageCollector {
         int deletedHere = 0;
         while ((deletedHere = 
expireHistoricTasksNowReadyForImmediateDeletion()) > 0) {
             // delete in loop so we don't have descendants sticking around 
until deleted in later cycles
-            deletedCount += deletedHere; 
+            deletedCount += deletedHere;
+            if (LOG.isTraceEnabled()) LOG.trace("GC history loop deleted 
"+deletedHere+" this time, count now "+deletedCount);
         }
-        
+
         deletedHere = expireIfOverCapacityGlobally();
+        if (LOG.isTraceEnabled()) LOG.trace("GC history capacity deleted 
"+deletedHere);
         deletedCount += deletedHere;
         while (deletedHere > 0) {
-            deletedCount += (deletedHere = 
expireHistoricTasksNowReadyForImmediateDeletion()); 
+            deletedCount += (deletedHere = 
expireHistoricTasksNowReadyForImmediateDeletion());
+            if (LOG.isTraceEnabled()) LOG.trace("GC history 
post=-capacity-deletion loop deleted "+deletedHere+" this time, count now 
"+deletedCount);
         }
-        
+
         return deletedCount;
+
+        // or not to run in a loop
+//        int deletedHere = expireHistoricTasksNowReadyForImmediateDeletion();
+//        if (LOG.isTraceEnabled()) LOG.trace("GC history loop deleted 
"+deletedHere+" this time, deletion count now "+deletedCount);
+//        deletedCount += deletedHere;
+//
+//        deletedHere = expireIfOverCapacityGlobally();
+//        if (LOG.isTraceEnabled()) LOG.trace("GC history capacity deleted 
"+deletedHere);
+//        deletedCount += deletedHere;
+//        if (deletedHere > 0) {
+//            deletedCount += (deletedHere = 
expireHistoricTasksNowReadyForImmediateDeletion());
+//            if (LOG.isTraceEnabled()) LOG.trace("GC history post-capacity 
deleted "+deletedHere+" this time, count now "+deletedCount);
+//        }
+//
+//        return deletedCount;
+
     }
 
     protected static boolean isTagIgnoredForGc(Object tag) {
@@ -506,6 +529,7 @@ public class BrooklynGarbageCollector {
         Collection<Task<?>> allTasks = executionManager.getAllTasks();
         Collection<Task<?>> tasksToDelete = MutableList.of();
         try {
+            if (LOG.isTraceEnabled()) LOG.trace("GC history scanning 
"+allTasks.size()+" tasks");
             for (Task<?> task: allTasks) {
                 if (!shouldDeleteTaskImmediately(task)) {
                     // 2017-09 previously we only checked done and submitter 
expired, and deleted if both were true
@@ -522,11 +546,14 @@ public class BrooklynGarbageCollector {
             // delete what we've found so far
             LOG.debug("Got CME inspecting aged tasks, with 
"+tasksToDelete.size()+" found for deletion: "+e);
         }
-        
+
+        if (LOG.isTraceEnabled()) LOG.trace("GC history scanned 
"+allTasks.size()+" tasks, found "+tasksToDelete.size()+" for deletion, now 
deleting");
+        int deletionCount = 0;
         for (Task<?> task: tasksToDelete) {
-            executionManager.deleteTask(task);
+            if (executionManager.deleteTask(task)) deletionCount++;
         }
-        return tasksToDelete.size();
+        if (LOG.isTraceEnabled()) LOG.trace("GC history scanned 
"+allTasks.size()+" tasks, "+tasksToDelete.size()+" proposed for deletion, 
actually deleted "+deletionCount);
+        return deletionCount;
     }
     
     private boolean isAssociatedToActiveEntity(Task<?> task) {
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 504c7f143e..005ba07336 100644
--- 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -418,14 +418,15 @@ public class BasicExecutionManager implements 
ExecutionManager {
         return false;
     }
 
-    public void deleteTask(Task<?> task) {
-        deleteTask(task, true);
+    public boolean deleteTask(Task<?> task) {
+        return deleteTask(task, true);
     }
     /** removes all exec manager records of a task, except, if second argument 
is true (usually is) keep the pointer to ID
-     * if its submitter is a parent with an active record to this child */
-    public void deleteTask(Task<?> task, boolean keepByIdIfParentPresentById) {
+     * if its submitter is a parent with an active record to this child.
+     * returns true if completely deleted (false if not deleted, or deleted in 
byTags map but kept due to parent ID) */
+    public boolean deleteTask(Task<?> task, boolean 
keepByIdIfParentPresentById) {
         Boolean removed = deleteTaskNonRecursive(task, 
keepByIdIfParentPresentById);
-        if (Boolean.FALSE.equals(removed)) return;
+        if (!Boolean.TRUE.equals(removed)) return false;
 
         if (task instanceof HasTaskChildren) {
             List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren) 
task).getChildren());
@@ -433,6 +434,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
                 deleteTask(child, keepByIdIfParentPresentById);
             }
         }
+        return true;
     }
 
     protected Boolean deleteTaskNonRecursive(Task<?> task) {
@@ -446,57 +448,56 @@ public class BasicExecutionManager implements 
ExecutionManager {
          in this case it will of course get deleted when its parent/ancestor 
is deleted. */
     protected Boolean deleteTaskNonRecursive(Task<?> task, boolean 
keepByIdIfParentPresentById) {
         Set<?> tags = TaskTags.getTagsFast(checkNotNull(task, "task"));
+        int removedByTagCount = 0;
         for (Object tag : tags) {
             synchronized (tasksByTag) {
                 Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag);
                 if (tasks != null) {
-                    tasks.remove(task);
-                    if (tasks.isEmpty()) {
-                        tasksByTag.remove(tag);
+                    if (tasks.remove(task)) {
+                        removedByTagCount++;
+                        if (tasks.isEmpty()) {
+                            tasksByTag.remove(tag);
+                        }
                     }
                 }
             }
         }
+        int removedByTagMissingCount = tags.size() - removedByTagCount;
 
-        boolean removeById = true;
+        boolean removeById;
         if (keepByIdIfParentPresentById) {
-            String submittedById = task.getSubmittedByTaskId();
-            if (submittedById!=null) {
-                Task<?> submittedBy = tasksById.get(submittedById);
-                if (submittedBy != null && submittedBy instanceof 
HasTaskChildren) {
-                    if (Iterables.contains(((HasTaskChildren) 
submittedBy).getChildren(), task)) {
-                        removeById = false;
-                    }
-                }
-            }
+            removeById = !Tasks.isChildOfSubmitter(task, tasksById::get);
+        } else {
+            removeById = true;
         }
+
         Boolean result;
-        Task<?> removed;
+        Task<?> removedById;
         if (removeById) {
-            removed = tasksById.remove(task.getId());
-            result = removed != null;
+            removedById = tasksById.remove(task.getId());
+            result = removedById != null;
         } else {
-            removed = null;
+            removedById = null;
             result = null;
         }
 
-        incompleteTaskIds.remove(task.getId());
-        if (removed != null && removed.isSubmitted() && !removed.isDone(true)) 
{
-            Entity context = BrooklynTaskTags.getContextEntity(removed);
+        boolean removedIncompleteById = incompleteTaskIds.remove(task.getId());
+        if (removedById != null && removedById.isSubmitted() && 
!removedById.isDone(true)) {
+            Entity context = BrooklynTaskTags.getContextEntity(removedById);
             if (context != null && !Entities.isManaged(context)) {
-                log.debug("Deleting active task on unmanagement of " + context 
+ ": " + removed);
+                log.debug("Deleting active task on unmanagement of " + context 
+ ": " + removedById);
             } else {
-                boolean debugOnly = removed.isDone();
+                boolean debugOnly = removedById.isDone();
 
                 if (debugOnly) {
-                    log.debug("Deleting cancelled task before completion: " + 
removed + "; this task will continue to run in the background outwith " + this);
+                    log.debug("Deleting cancelled task before completion: " + 
removedById + "; this task will continue to run in the background outwith " + 
this);
                 } else {
-                    log.warn("Deleting submitted task before completion: " + 
removed + " (tags " + removed.getTags() + "); this task will continue to run in 
the background outwith " + this + ", but perhaps it should have been 
cancelled?");
+                    log.warn("Deleting submitted task before completion: " + 
removedById + " (tags " + removedById.getTags() + "); this task will continue 
to run in the background outwith " + this + ", but perhaps it should have been 
cancelled?");
                     log.debug("Active task deletion trace", new 
Throwable("Active task deletion trace"));
                 }
             }
         }
-        if (removed != null) {
+        if (removedById != null) {
             task.getTags().forEach(t -> {
                 // remove tags which might have references to entities etc 
(help out garbage collector)
                 if (t instanceof TaskInternal) {
@@ -505,6 +506,32 @@ public class BasicExecutionManager implements 
ExecutionManager {
                 }
             });
         }
+
+        // if tag deletion is problematic we can use this logic to investigate 
(but currently there is no reason to think it is, i was just checking)
+//        if ((!removeById || Boolean.TRUE.equals(result)) && 
removedByTagMissingCount==0) {
+//            // cleanly deleted, or not deleted if so requested
+//            return true;
+//        }
+//        if ((removeById && !Boolean.TRUE.equals(result)) && 
removedByTagCount==0) {
+//            // not deleted at all
+//            return false;
+//        }
+//        // incomplete deletion detected
+//        log.warn("Incomplete deletion for "+task+"; 
removeById="+removeById+", removedById="+removedById+", 
removedIncompleteById="+removedIncompleteById+", 
removedByTagCount="+removedByTagCount+", 
removedByTagMissingCount="+removedByTagMissingCount);
+//        // if tag deletion is not working, investigate each task (don't rely 
on hashes)
+//        synchronized (tasksByTag) {
+//            Set<Object> tagsToDelete = MutableSet.of();
+//            tasksByTag.forEach( (tag, tasks) -> {
+//                if (tasks.removeIf(task::equals)) {
+//                    log.warn("Special deletion needed for tag "+tag+" on 
task "+task);
+//                    if (tasks.isEmpty()) {
+//                        tagsToDelete.add(tag);
+//                    }
+//                }
+//            });
+//            tagsToDelete.forEach(t -> tasksByTag.remove(t));
+//        }
+
         return result;
     }
 
@@ -557,7 +584,8 @@ public class BasicExecutionManager implements 
ExecutionManager {
      * exposes live view, for internal use only
      */
     @Beta
-    public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) {
+    public Set<Task<?>>
+    tasksWithTagLiveOrNull(Object tag) {
         synchronized (tasksByTag) {
             return tasksByTag.get(tag);
         }
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java 
b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
index 74fd919a05..ac4b8c60c6 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
@@ -557,7 +557,20 @@ public class Tasks {
             Time.sleep(Repeater.DEFAULT_REAL_QUICK_PERIOD);
         }
     }
-    
+
+    public static boolean isChildOfSubmitter(Task<?> task, 
Function<String,Task> parentLookupFunction) {
+        String submittedById = task.getSubmittedByTaskId();
+        if (submittedById!=null) {
+            Task<?> submittedBy = parentLookupFunction.apply(submittedById);
+            if (submittedBy != null && submittedBy instanceof HasTaskChildren) 
{
+                if (Iterables.contains(((HasTaskChildren) 
submittedBy).getChildren(), task)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     /** returns true if either the current thread or the current task is 
interrupted/cancelled */
     public static boolean isInterrupted() {
         if (Thread.currentThread().isInterrupted()) return true;

Reply via email to