This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 751b7ff77919a46728faa98f8e203314cb9d870b Author: Alex Heneveld <[email protected]> AuthorDate: Tue Sep 14 13:52:22 2021 +0100 better cleanup on switch from RO/hot --- .../brooklyn/core/mgmt/rebind/RebindIteration.java | 28 ++------------- .../core/mgmt/rebind/RebindManagerImpl.java | 42 +++++++++++++++++++++- 2 files changed, 43 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java index cfbd400..69dd588 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java @@ -264,32 +264,8 @@ public abstract class RebindIteration { protected void doRun() throws Exception { if (readOnlyRebindCount.get() > 1) { - // wait for tasks - Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive() - .stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != null).collect(Collectors.toList()); - List<Task<?>> openTasksIncludingCancelled; - CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15)); - do { - openTasksIncludingCancelled = entityTasks.stream().filter(t -> !t.isDone(true)).collect(Collectors.toList()); - List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList()); - if (openTasksIncludingCancelled.isEmpty()) break; - if (time.isExpired() && !openTasksCancellable.isEmpty()) { - LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) before rebinding again: " + openTasksCancellable); - openTasksCancellable.forEach(t -> t.cancel(true)); - } - if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) { - LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again: " + openTasksIncludingCancelled); - } - LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again, details: " + - openTasksIncludingCancelled.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList())); - Time.sleep(Duration.millis(200)); - } while (true); - - entityTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask); - - List<Task<?>> otherDoneTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive() - .stream().filter(t -> t.isDone(true)).collect(Collectors.toList()); - otherDoneTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask); + // prevent leaking + rebindManager.stopEntityAndDoneTasksBeforeRebinding(); } loadManifestFiles(); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java index 57395c7..2db7063 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java @@ -20,6 +20,7 @@ package org.apache.brooklyn.core.mgmt.rebind; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -29,6 +30,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.brooklyn.api.entity.Application; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ExecutionContext; @@ -48,7 +50,9 @@ import org.apache.brooklyn.core.BrooklynFeatureEnablement; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl; +import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext; import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; import org.apache.brooklyn.core.mgmt.persist.BrooklynMementoPersisterToObjectStore; import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils; @@ -61,11 +65,14 @@ import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.QuorumCheck; import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks; import org.apache.brooklyn.util.core.task.BasicExecutionContext; +import org.apache.brooklyn.util.core.task.BasicExecutionManager; import org.apache.brooklyn.util.core.task.ScheduledTask; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; +import org.apache.brooklyn.util.time.CountdownTimer; import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,7 +155,7 @@ public class RebindManagerImpl implements RebindManager { private PersistenceActivityMetrics persistMetrics = new PersistenceActivityMetrics(); Integer firstRebindAppCount, firstRebindEntityCount, firstRebindItemCount; - + /** * For tracking if rebinding, for {@link AbstractEnricher#isRebinding()} etc. * @@ -391,6 +398,39 @@ public class RebindManagerImpl implements RebindManager { readOnlyTask = null; LOG.debug("Stopped read-only rebinding ("+this+"), mgmt "+managementContext.getManagementNodeId()); } + stopEntityAndDoneTasksBeforeRebinding(); + } + + public void stopEntityAndDoneTasksBeforeRebinding() { + // wait for tasks + Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive() + .stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != null).collect(Collectors.toList()); + List<Task<?>> openTasksIncludingCancelled; + CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15)); + do { + openTasksIncludingCancelled = entityTasks.stream().filter(t -> !t.isDone(true)).collect(Collectors.toList()); + List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList()); + if (openTasksIncludingCancelled.isEmpty()) break; + if (time.isExpired() && !openTasksCancellable.isEmpty()) { + LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) before rebinding again: " + openTasksCancellable); + openTasksCancellable.forEach(t -> t.cancel(true)); + } + if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) { + LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again: " + openTasksIncludingCancelled); + } + LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again, details: " + + openTasksIncludingCancelled.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList())); + Time.sleep(Duration.millis(200)); + } while (true); + + entityTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask); + + List<Task<?>> otherDoneTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive() + .stream().filter(t -> t.isDone(true)).collect(Collectors.toList()); + otherDoneTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask); + + // also collect tasks, so that unmanaged entities are cleared before next run + ((LocalManagementContext)managementContext).getGarbageCollector().gcTasks(); } @Override
