This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 109876b10579641964aea9627e67259e04dd13e7 Author: Alex Heneveld <[email protected]> AuthorDate: Tue Sep 14 14:08:40 2021 +0100 stop tasks when ending RO mode also, more cleanly restart on rebind/promotion --- .../core/mgmt/ha/HighAvailabilityManagerImpl.java | 5 ++ .../brooklyn/core/mgmt/rebind/RebindIteration.java | 3 +- .../core/mgmt/rebind/RebindManagerImpl.java | 54 +++++++++++++++++----- .../core/mgmt/rebind/ManagementPlaneIdTest.java | 6 +-- 4 files changed, 53 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java index 00f511a..d3ad012 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java @@ -990,6 +990,9 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { * (e.g. during the periodic rebind as hot_stanby we will not repeatedly clear the brooklyn-managed-bundles). */ protected void clearManagedItems(ManagementTransitionMode mode) { + // note, tasks are cancelled prior to this, when coming from RO mode, via + // RebindManagerImpl.stopEntityAndDoneTasksBeforeRebinding + // log this because it may be surprising, it is just HA transitions, // not symmetric with usual single-node start LOG.info("Clearing all managed items on transition to "+mode); @@ -1018,6 +1021,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { ((BasicBrooklynCatalog)managementContext.getCatalog()).reset(CatalogDto.newEmptyInstance("<reset-by-ha-status-change>")); ((BasicBrooklynTypeRegistry)managementContext.getTypeRegistry()).clear(); managementContext.getCatalogInitialization().clearBrooklynManagedBundles(); + + ((LocalManagementContext)managementContext).getGarbageCollector().gcTasks(); } /** Starts hot standby or hot backup, in foreground diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java index 69dd588..bf28d65 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java @@ -265,7 +265,7 @@ public abstract class RebindIteration { protected void doRun() throws Exception { if (readOnlyRebindCount.get() > 1) { // prevent leaking - rebindManager.stopEntityAndDoneTasksBeforeRebinding(); + rebindManager.stopEntityAndDoneTasksBeforeRebinding("before next read-only rebind", Duration.seconds(10), Duration.seconds(20)); } loadManifestFiles(); @@ -560,6 +560,7 @@ public abstract class RebindIteration { try { Feed feed = instantiator.newFeed(feedMemento); rebindContext.registerFeed(feedMemento.getId(), feed); + // started during associateAdjunctsWithEntities by RebindAdjuncts } catch (Exception e) { exceptionHandler.onCreateFailed(BrooklynObjectType.FEED, feedMemento.getId(), feedMemento.getType(), e); } diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java index 2db7063..781a8c1 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java @@ -47,6 +47,7 @@ import org.apache.brooklyn.api.mgmt.rebind.mementos.TreeNode; import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.BrooklynFeatureEnablement; +import org.apache.brooklyn.core.BrooklynVersion; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.Entities; @@ -398,29 +399,60 @@ public class RebindManagerImpl implements RebindManager { readOnlyTask = null; LOG.debug("Stopped read-only rebinding ("+this+"), mgmt "+managementContext.getManagementNodeId()); } - stopEntityAndDoneTasksBeforeRebinding(); + // short waits when promoting + stopEntityAndDoneTasksBeforeRebinding("when stopping hot proxy read-only mode", + Duration.seconds(2), + Duration.seconds(5)); + // note, items are subsequently unmanaged via: + // HighAvailabilityManagerImpl.clearManagedItems } - public void stopEntityAndDoneTasksBeforeRebinding() { + public void stopEntityAndDoneTasksBeforeRebinding(String reason, Duration delayBeforeCancelling, Duration delayBeforeAbandoning) { + // TODO inputs should be configurable + + if (!managementContext.isRunning() || managementContext.getExecutionManager().isShutdown()) { + return; + } + // wait for tasks Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive() .stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != null).collect(Collectors.toList()); List<Task<?>> openTasksIncludingCancelled; - CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15)); + CountdownTimer timeBeforeCancelling = CountdownTimer.newInstanceStarted(delayBeforeCancelling); + CountdownTimer timeBeforeAbandoning = CountdownTimer.newInstanceStarted(delayBeforeAbandoning); + Duration backoff = Duration.millis(10); do { openTasksIncludingCancelled = entityTasks.stream().filter(t -> !t.isDone(true)).collect(Collectors.toList()); - List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList()); if (openTasksIncludingCancelled.isEmpty()) break; - if (time.isExpired() && !openTasksCancellable.isEmpty()) { - LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) before rebinding again: " + openTasksCancellable); + + List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList()); + List<Task<?>> openTasksScheduled = openTasksCancellable.stream().filter(t -> t instanceof ScheduledTask).collect(Collectors.toList()); + + if (!openTasksScheduled.isEmpty()) { + // stop scheduled tasks immediately + openTasksScheduled.forEach(t -> t.cancel(false)); + continue; + } + + if (timeBeforeCancelling!=null && timeBeforeCancelling.isExpired() && !openTasksCancellable.isEmpty()) { + LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) "+reason+": " + openTasksCancellable); openTasksCancellable.forEach(t -> t.cancel(true)); + timeBeforeCancelling = null; } - if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) { - LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again: " + openTasksIncludingCancelled); + + if (timeBeforeAbandoning.isExpired()) break; + + if (timeBeforeAbandoning.getDurationElapsed().isShorterThan(backoff)) { + LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) "+reason+": " + openTasksIncludingCancelled); } - LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again, details: " + - openTasksIncludingCancelled.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList())); - Time.sleep(Duration.millis(200)); + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) " + reason + ", details: " + + openTasksIncludingCancelled.stream().map(t -> "" + t + "(" + BrooklynTaskTags.getContextEntity(t) + ")").collect(Collectors.toList())); + } + + Time.sleep(Duration.min(timeBeforeAbandoning.getDurationRemaining(), backoff)); + backoff = Duration.min(backoff.multiply(2), Duration.millis(200)); + } while (true); entityTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask); diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java index ea3ee5e..c24edcd 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java @@ -85,7 +85,7 @@ public class ManagementPlaneIdTest { checkPlaneIdPersisted(mgmt); } - @Test + @Test(groups="Integration") // because slow public void testPlaneIdRolledBack() throws Exception { final LocalManagementContext mgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.AUTO); @@ -101,7 +101,7 @@ public class ManagementPlaneIdTest { }); } - @Test + @Test(groups="Integration") // because slow public void testColdRebindInitialisesPlaneId() throws Exception { final LocalManagementContext origMgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.DISABLED); checkPlaneIdPersisted(origMgmt); @@ -136,7 +136,7 @@ public class ManagementPlaneIdTest { }); } - @Test + @Test(groups="Integration") // because slow public void testHaFailoverKeepsPlaneId() throws Exception { final LocalManagementContext origMgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.MASTER); final LocalManagementContext rebindMgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.STANDBY);
