This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit cf5937fe3031ecd0a4d877fc31ea8b94aa3f4f3c Author: Alex Heneveld <[email protected]> AuthorDate: Tue Aug 16 11:50:51 2022 +0100 prevent GC from looping waiting for non-deleteable tasks, and more efficient by looking up tasks that won't be deleteable anyway --- .../mgmt/internal/BrooklynGarbageCollector.java | 41 ++++++++-- .../util/core/task/BasicExecutionManager.java | 90 ++++++++++++++-------- .../org/apache/brooklyn/util/core/task/Tasks.java | 15 +++- 3 files changed, 107 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java index f66823b174..5c3b60dc79 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java @@ -308,6 +308,10 @@ public class BrooklynGarbageCollector { if (!task.isDone(true)) { return false; } + if (Tasks.isChildOfSubmitter(task, executionManager::getTask)) { + // we don't delete children until the parent is deleted; short-circuit the consideration of this + return false; + } Set<Object> tags = BrooklynTaskTags.getTagsFast(task); if (tags.contains(ManagementContextInternal.TRANSIENT_TASK_TAG)) @@ -418,16 +422,35 @@ public class BrooklynGarbageCollector { int deletedHere = 0; while ((deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()) > 0) { // delete in loop so we don't have descendants sticking around until deleted in later cycles - deletedCount += deletedHere; + deletedCount += deletedHere; + if (LOG.isTraceEnabled()) LOG.trace("GC history loop deleted "+deletedHere+" this time, count now "+deletedCount); } - + deletedHere = expireIfOverCapacityGlobally(); + if (LOG.isTraceEnabled()) LOG.trace("GC history capacity deleted "+deletedHere); deletedCount += deletedHere; while (deletedHere > 0) { - deletedCount += (deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()); + deletedCount += (deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()); + if (LOG.isTraceEnabled()) LOG.trace("GC history post=-capacity-deletion loop deleted "+deletedHere+" this time, count now "+deletedCount); } - + return deletedCount; + + // or not to run in a loop +// int deletedHere = expireHistoricTasksNowReadyForImmediateDeletion(); +// if (LOG.isTraceEnabled()) LOG.trace("GC history loop deleted "+deletedHere+" this time, deletion count now "+deletedCount); +// deletedCount += deletedHere; +// +// deletedHere = expireIfOverCapacityGlobally(); +// if (LOG.isTraceEnabled()) LOG.trace("GC history capacity deleted "+deletedHere); +// deletedCount += deletedHere; +// if (deletedHere > 0) { +// deletedCount += (deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()); +// if (LOG.isTraceEnabled()) LOG.trace("GC history post-capacity deleted "+deletedHere+" this time, count now "+deletedCount); +// } +// +// return deletedCount; + } protected static boolean isTagIgnoredForGc(Object tag) { @@ -506,6 +529,7 @@ public class BrooklynGarbageCollector { Collection<Task<?>> allTasks = executionManager.getAllTasks(); Collection<Task<?>> tasksToDelete = MutableList.of(); try { + if (LOG.isTraceEnabled()) LOG.trace("GC history scanning "+allTasks.size()+" tasks"); for (Task<?> task: allTasks) { if (!shouldDeleteTaskImmediately(task)) { // 2017-09 previously we only checked done and submitter expired, and deleted if both were true @@ -522,11 +546,14 @@ public class BrooklynGarbageCollector { // delete what we've found so far LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e); } - + + if (LOG.isTraceEnabled()) LOG.trace("GC history scanned "+allTasks.size()+" tasks, found "+tasksToDelete.size()+" for deletion, now deleting"); + int deletionCount = 0; for (Task<?> task: tasksToDelete) { - executionManager.deleteTask(task); + if (executionManager.deleteTask(task)) deletionCount++; } - return tasksToDelete.size(); + if (LOG.isTraceEnabled()) LOG.trace("GC history scanned "+allTasks.size()+" tasks, "+tasksToDelete.size()+" proposed for deletion, actually deleted "+deletionCount); + return deletionCount; } private boolean isAssociatedToActiveEntity(Task<?> task) { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index 504c7f143e..005ba07336 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -418,14 +418,15 @@ public class BasicExecutionManager implements ExecutionManager { return false; } - public void deleteTask(Task<?> task) { - deleteTask(task, true); + public boolean deleteTask(Task<?> task) { + return deleteTask(task, true); } /** removes all exec manager records of a task, except, if second argument is true (usually is) keep the pointer to ID - * if its submitter is a parent with an active record to this child */ - public void deleteTask(Task<?> task, boolean keepByIdIfParentPresentById) { + * if its submitter is a parent with an active record to this child. + * returns true if completely deleted (false if not deleted, or deleted in byTags map but kept due to parent ID) */ + public boolean deleteTask(Task<?> task, boolean keepByIdIfParentPresentById) { Boolean removed = deleteTaskNonRecursive(task, keepByIdIfParentPresentById); - if (Boolean.FALSE.equals(removed)) return; + if (!Boolean.TRUE.equals(removed)) return false; if (task instanceof HasTaskChildren) { List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren) task).getChildren()); @@ -433,6 +434,7 @@ public class BasicExecutionManager implements ExecutionManager { deleteTask(child, keepByIdIfParentPresentById); } } + return true; } protected Boolean deleteTaskNonRecursive(Task<?> task) { @@ -446,57 +448,56 @@ public class BasicExecutionManager implements ExecutionManager { in this case it will of course get deleted when its parent/ancestor is deleted. */ protected Boolean deleteTaskNonRecursive(Task<?> task, boolean keepByIdIfParentPresentById) { Set<?> tags = TaskTags.getTagsFast(checkNotNull(task, "task")); + int removedByTagCount = 0; for (Object tag : tags) { synchronized (tasksByTag) { Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag); if (tasks != null) { - tasks.remove(task); - if (tasks.isEmpty()) { - tasksByTag.remove(tag); + if (tasks.remove(task)) { + removedByTagCount++; + if (tasks.isEmpty()) { + tasksByTag.remove(tag); + } } } } } + int removedByTagMissingCount = tags.size() - removedByTagCount; - boolean removeById = true; + boolean removeById; if (keepByIdIfParentPresentById) { - String submittedById = task.getSubmittedByTaskId(); - if (submittedById!=null) { - Task<?> submittedBy = tasksById.get(submittedById); - if (submittedBy != null && submittedBy instanceof HasTaskChildren) { - if (Iterables.contains(((HasTaskChildren) submittedBy).getChildren(), task)) { - removeById = false; - } - } - } + removeById = !Tasks.isChildOfSubmitter(task, tasksById::get); + } else { + removeById = true; } + Boolean result; - Task<?> removed; + Task<?> removedById; if (removeById) { - removed = tasksById.remove(task.getId()); - result = removed != null; + removedById = tasksById.remove(task.getId()); + result = removedById != null; } else { - removed = null; + removedById = null; result = null; } - incompleteTaskIds.remove(task.getId()); - if (removed != null && removed.isSubmitted() && !removed.isDone(true)) { - Entity context = BrooklynTaskTags.getContextEntity(removed); + boolean removedIncompleteById = incompleteTaskIds.remove(task.getId()); + if (removedById != null && removedById.isSubmitted() && !removedById.isDone(true)) { + Entity context = BrooklynTaskTags.getContextEntity(removedById); if (context != null && !Entities.isManaged(context)) { - log.debug("Deleting active task on unmanagement of " + context + ": " + removed); + log.debug("Deleting active task on unmanagement of " + context + ": " + removedById); } else { - boolean debugOnly = removed.isDone(); + boolean debugOnly = removedById.isDone(); if (debugOnly) { - log.debug("Deleting cancelled task before completion: " + removed + "; this task will continue to run in the background outwith " + this); + log.debug("Deleting cancelled task before completion: " + removedById + "; this task will continue to run in the background outwith " + this); } else { - log.warn("Deleting submitted task before completion: " + removed + " (tags " + removed.getTags() + "); this task will continue to run in the background outwith " + this + ", but perhaps it should have been cancelled?"); + log.warn("Deleting submitted task before completion: " + removedById + " (tags " + removedById.getTags() + "); this task will continue to run in the background outwith " + this + ", but perhaps it should have been cancelled?"); log.debug("Active task deletion trace", new Throwable("Active task deletion trace")); } } } - if (removed != null) { + if (removedById != null) { task.getTags().forEach(t -> { // remove tags which might have references to entities etc (help out garbage collector) if (t instanceof TaskInternal) { @@ -505,6 +506,32 @@ public class BasicExecutionManager implements ExecutionManager { } }); } + + // if tag deletion is problematic we can use this logic to investigate (but currently there is no reason to think it is, i was just checking) +// if ((!removeById || Boolean.TRUE.equals(result)) && removedByTagMissingCount==0) { +// // cleanly deleted, or not deleted if so requested +// return true; +// } +// if ((removeById && !Boolean.TRUE.equals(result)) && removedByTagCount==0) { +// // not deleted at all +// return false; +// } +// // incomplete deletion detected +// log.warn("Incomplete deletion for "+task+"; removeById="+removeById+", removedById="+removedById+", removedIncompleteById="+removedIncompleteById+", removedByTagCount="+removedByTagCount+", removedByTagMissingCount="+removedByTagMissingCount); +// // if tag deletion is not working, investigate each task (don't rely on hashes) +// synchronized (tasksByTag) { +// Set<Object> tagsToDelete = MutableSet.of(); +// tasksByTag.forEach( (tag, tasks) -> { +// if (tasks.removeIf(task::equals)) { +// log.warn("Special deletion needed for tag "+tag+" on task "+task); +// if (tasks.isEmpty()) { +// tagsToDelete.add(tag); +// } +// } +// }); +// tagsToDelete.forEach(t -> tasksByTag.remove(t)); +// } + return result; } @@ -557,7 +584,8 @@ public class BasicExecutionManager implements ExecutionManager { * exposes live view, for internal use only */ @Beta - public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) { + public Set<Task<?>> + tasksWithTagLiveOrNull(Object tag) { synchronized (tasksByTag) { return tasksByTag.get(tag); } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java index 74fd919a05..ac4b8c60c6 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java @@ -557,7 +557,20 @@ public class Tasks { Time.sleep(Repeater.DEFAULT_REAL_QUICK_PERIOD); } } - + + public static boolean isChildOfSubmitter(Task<?> task, Function<String,Task> parentLookupFunction) { + String submittedById = task.getSubmittedByTaskId(); + if (submittedById!=null) { + Task<?> submittedBy = parentLookupFunction.apply(submittedById); + if (submittedBy != null && submittedBy instanceof HasTaskChildren) { + if (Iterables.contains(((HasTaskChildren) submittedBy).getChildren(), task)) { + return true; + } + } + } + return false; + } + /** returns true if either the current thread or the current task is interrupted/cancelled */ public static boolean isInterrupted() { if (Thread.currentThread().isInterrupted()) return true;
