This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 783b243fdea0a70d97fc9db6a50d73c1dde44a4d Author: Alex Heneveld <[email protected]> AuthorDate: Tue Sep 14 11:15:10 2021 +0100 only delete tasks which are done including completion if cancelled --- .../mgmt/internal/BrooklynGarbageCollector.java | 11 +++-------- .../brooklyn/core/mgmt/rebind/RebindIteration.java | 22 ++++++++++++---------- .../util/core/task/BasicExecutionManager.java | 10 ++++++++-- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java index b1bc870..74f89a4 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java @@ -531,7 +531,7 @@ public class BrooklynGarbageCollector { return false; } Task<?> submitter = task.getSubmittedByTask(); - if (submitter!=null && (!submitter.isDone() || executionManager.getTask(submitter.getId())!=null)) { + if (submitter!=null && (!submitter.isDone(true) || executionManager.getTask(submitter.getId())!=null)) { return false; } // submitter task is GC'd @@ -679,13 +679,8 @@ public class BrooklynGarbageCollector { tasksLive = executionManager.getTasksWithAllTags(MutableList.of()); } - MutableList<Task<?>> tasks = MutableList.of(); - for (Task<?> task: tasksLive) { - if (task.isDone()) { - tasks.add(task); - } - } - + List<Task<?>> tasks = tasksLive.stream().filter(t -> t.isDone(true)).collect(Collectors.toList()); + int numToDelete = tasks.size() - brooklynProperties.getConfig(MAX_TASKS_GLOBAL); if (numToDelete <= 0) { LOG.debug("brooklyn-gc detected only "+tasks.size()+" completed tasks in memory, not over global limit, so not deleting any"); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java index c3a7ab7..ede24a9 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java @@ -267,27 +267,28 @@ public abstract class RebindIteration { // wait for tasks Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive() .stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != null).collect(Collectors.toList()); - List<Task<?>> openTasks; + List<Task<?>> openTasksIncludingCancelled; CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15)); do { - openTasks = entityTasks.stream().filter(t -> !t.isDone()).collect(Collectors.toList()); - if (openTasks.isEmpty()) break; - if (time.isExpired()) { - LOG.warn("Aborting " + openTasks.size() + " incomplete task(s) before rebinding again: " + openTasks); - openTasks.forEach(t -> t.cancel(true)); + openTasksIncludingCancelled = entityTasks.stream().filter(t -> !t.isDone(true)).collect(Collectors.toList()); + List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList()); + if (openTasksIncludingCancelled.isEmpty()) break; + if (time.isExpired() && !openTasksCancellable.isEmpty()) { + LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) before rebinding again: " + openTasksCancellable); + openTasksCancellable.forEach(t -> t.cancel(true)); } if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) { - LOG.info("Waiting on " + openTasks.size() + " task(s) before rebinding again: " + openTasks); + LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again: " + openTasksIncludingCancelled); } - LOG.debug("Waiting on " + openTasks.size() + " task(s) before rebinding again, details: " + - openTasks.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList())); + LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again, details: " + + openTasksIncludingCancelled.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList())); Time.sleep(Duration.millis(200)); } while (true); entityTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask); List<Task<?>> otherDoneTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive() - .stream().filter(Task::isDone).collect(Collectors.toList()); + .stream().filter(t -> t.isDone(true)).collect(Collectors.toList()); otherDoneTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask); } @@ -420,6 +421,7 @@ public abstract class RebindIteration { for (ManagedBundleMemento bundleMemento : mementoManifest.getBundles().values()) { ManagedBundle managedBundle = instantiator.newManagedBundle(bundleMemento); bundles.put(managedBundle.getVersionedName(), new InstallableManagedBundleImpl(bundleMemento, managedBundle)); + logRebindingDebug("Registering bundle "+bundleMemento.getId()+": "+managedBundle); rebindContext.registerBundle(bundleMemento.getId(), managedBundle); } } else { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index b7e5adf..995c49e 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -390,9 +390,15 @@ public class BasicExecutionManager implements ExecutionManager { if (removed!=null && removed.isSubmitted() && !removed.isDone(true)) { Entity context = BrooklynTaskTags.getContextEntity(removed); if (context!=null && !Entities.isManaged(context)) { - log.debug("Forgetting about active task on unmanagement of "+context+": "+removed); + log.debug("Deleting active task on unmanagement of "+context+": "+removed); } else { - log.warn("Deleting submitted task before completion: "+removed+" (tags "+removed.getTags()+"); this task will continue to run in the background outwith "+this+", but perhaps it should have been cancelled?"); + if (removed.isDone()) { + log.debug("Deleting cancelled task before completion: " + removed + "; this task will continue to run in the background outwith " + this); + } else { + log.warn("Deleting submitted task before completion: " + removed + " (tags " + removed.getTags() + "); this task will continue to run in the background outwith " + this + ", but perhaps it should have been cancelled?"); + + } + log.debug("Active task deletion trace", new Throwable("Active task deletion trace")); } } task.getTags().forEach(t -> {
