This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 109876b10579641964aea9627e67259e04dd13e7
Author: Alex Heneveld <[email protected]>
AuthorDate: Tue Sep 14 14:08:40 2021 +0100

    stop tasks when ending RO mode also, more cleanly restart on 
rebind/promotion
---
 .../core/mgmt/ha/HighAvailabilityManagerImpl.java  |  5 ++
 .../brooklyn/core/mgmt/rebind/RebindIteration.java |  3 +-
 .../core/mgmt/rebind/RebindManagerImpl.java        | 54 +++++++++++++++++-----
 .../core/mgmt/rebind/ManagementPlaneIdTest.java    |  6 +--
 4 files changed, 53 insertions(+), 15 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
index 00f511a..d3ad012 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
@@ -990,6 +990,9 @@ public class HighAvailabilityManagerImpl implements 
HighAvailabilityManager {
      * (e.g. during the periodic rebind as hot_stanby we will not repeatedly 
clear the brooklyn-managed-bundles).
      */
     protected void clearManagedItems(ManagementTransitionMode mode) {
+        // note, tasks are cancelled prior to this, when coming from RO mode, 
via
+        // RebindManagerImpl.stopEntityAndDoneTasksBeforeRebinding
+
         // log this because it may be surprising, it is just HA transitions,
         // not symmetric with usual single-node start
         LOG.info("Clearing all managed items on transition to "+mode);
@@ -1018,6 +1021,8 @@ public class HighAvailabilityManagerImpl implements 
HighAvailabilityManager {
         
((BasicBrooklynCatalog)managementContext.getCatalog()).reset(CatalogDto.newEmptyInstance("<reset-by-ha-status-change>"));
         
((BasicBrooklynTypeRegistry)managementContext.getTypeRegistry()).clear();
         
managementContext.getCatalogInitialization().clearBrooklynManagedBundles();
+
+        
((LocalManagementContext)managementContext).getGarbageCollector().gcTasks();
     }
     
     /** Starts hot standby or hot backup, in foreground
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index 69dd588..bf28d65 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -265,7 +265,7 @@ public abstract class RebindIteration {
     protected void doRun() throws Exception {
         if (readOnlyRebindCount.get() > 1) {
             // prevent leaking
-            rebindManager.stopEntityAndDoneTasksBeforeRebinding();
+            rebindManager.stopEntityAndDoneTasksBeforeRebinding("before next 
read-only rebind", Duration.seconds(10), Duration.seconds(20));
         }
 
         loadManifestFiles();
@@ -560,6 +560,7 @@ public abstract class RebindIteration {
                 try {
                     Feed feed = instantiator.newFeed(feedMemento);
                     rebindContext.registerFeed(feedMemento.getId(), feed);
+                    // started during associateAdjunctsWithEntities by 
RebindAdjuncts
                 } catch (Exception e) {
                     exceptionHandler.onCreateFailed(BrooklynObjectType.FEED, 
feedMemento.getId(), feedMemento.getType(), e);
                 }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index 2db7063..781a8c1 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -47,6 +47,7 @@ import org.apache.brooklyn.api.mgmt.rebind.mementos.TreeNode;
 import org.apache.brooklyn.api.objs.BrooklynObject;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.BrooklynFeatureEnablement;
+import org.apache.brooklyn.core.BrooklynVersion;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.enricher.AbstractEnricher;
 import org.apache.brooklyn.core.entity.Entities;
@@ -398,29 +399,60 @@ public class RebindManagerImpl implements RebindManager {
             readOnlyTask = null;
             LOG.debug("Stopped read-only rebinding ("+this+"), mgmt 
"+managementContext.getManagementNodeId());
         }
-        stopEntityAndDoneTasksBeforeRebinding();
+        // short waits when promoting
+        stopEntityAndDoneTasksBeforeRebinding("when stopping hot proxy 
read-only mode",
+                Duration.seconds(2),
+                Duration.seconds(5));
+        // note, items are subsequently unmanaged via:
+        // HighAvailabilityManagerImpl.clearManagedItems
     }
 
-    public void stopEntityAndDoneTasksBeforeRebinding() {
+    public void stopEntityAndDoneTasksBeforeRebinding(String reason, Duration 
delayBeforeCancelling, Duration delayBeforeAbandoning) {
+        // TODO inputs should be configurable
+
+        if (!managementContext.isRunning() || 
managementContext.getExecutionManager().isShutdown()) {
+            return;
+        }
+
         // wait for tasks
         Collection<Task<?>> entityTasks = ((BasicExecutionManager) 
managementContext.getExecutionManager()).allTasksLive()
                 .stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != 
null).collect(Collectors.toList());
         List<Task<?>> openTasksIncludingCancelled;
-        CountdownTimer time = 
CountdownTimer.newInstanceStarted(Duration.seconds(15));
+        CountdownTimer timeBeforeCancelling = 
CountdownTimer.newInstanceStarted(delayBeforeCancelling);
+        CountdownTimer timeBeforeAbandoning = 
CountdownTimer.newInstanceStarted(delayBeforeAbandoning);
+        Duration backoff = Duration.millis(10);
         do {
             openTasksIncludingCancelled = entityTasks.stream().filter(t -> 
!t.isDone(true)).collect(Collectors.toList());
-            List<Task<?>> openTasksCancellable = 
openTasksIncludingCancelled.stream().filter(t -> 
!t.isDone()).collect(Collectors.toList());
             if (openTasksIncludingCancelled.isEmpty()) break;
-            if (time.isExpired() && !openTasksCancellable.isEmpty()) {
-                LOG.warn("Aborting " + openTasksCancellable.size() + " 
incomplete task(s) before rebinding again: " + openTasksCancellable);
+
+            List<Task<?>> openTasksCancellable = 
openTasksIncludingCancelled.stream().filter(t -> 
!t.isDone()).collect(Collectors.toList());
+            List<Task<?>> openTasksScheduled = 
openTasksCancellable.stream().filter(t -> t instanceof 
ScheduledTask).collect(Collectors.toList());
+
+            if (!openTasksScheduled.isEmpty()) {
+                // stop scheduled tasks immediately
+                openTasksScheduled.forEach(t -> t.cancel(false));
+                continue;
+            }
+
+            if (timeBeforeCancelling!=null && timeBeforeCancelling.isExpired() 
&& !openTasksCancellable.isEmpty()) {
+                LOG.warn("Aborting " + openTasksCancellable.size() + " 
incomplete task(s) "+reason+": " + openTasksCancellable);
                 openTasksCancellable.forEach(t -> t.cancel(true));
+                timeBeforeCancelling = null;
             }
-            if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) 
{
-                LOG.info("Waiting on " + openTasksIncludingCancelled.size() + 
" task(s) before rebinding again: " + openTasksIncludingCancelled);
+
+            if (timeBeforeAbandoning.isExpired()) break;
+
+            if 
(timeBeforeAbandoning.getDurationElapsed().isShorterThan(backoff)) {
+                LOG.info("Waiting on " + openTasksIncludingCancelled.size() + 
" task(s) "+reason+": " + openTasksIncludingCancelled);
             }
-            LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " 
task(s) before rebinding again, details: " +
-                    openTasksIncludingCancelled.stream().map(t -> 
""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList()));
-            Time.sleep(Duration.millis(200));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + 
" task(s) " + reason + ", details: " +
+                        openTasksIncludingCancelled.stream().map(t -> "" + t + 
"(" + BrooklynTaskTags.getContextEntity(t) + ")").collect(Collectors.toList()));
+            }
+
+            
Time.sleep(Duration.min(timeBeforeAbandoning.getDurationRemaining(), backoff));
+            backoff = Duration.min(backoff.multiply(2), Duration.millis(200));
+
         } while (true);
 
         entityTasks.forEach(((BasicExecutionManager) 
managementContext.getExecutionManager())::deleteTask);
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java
index ea3ee5e..c24edcd 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java
@@ -85,7 +85,7 @@ public class ManagementPlaneIdTest {
         checkPlaneIdPersisted(mgmt);
     }
 
-    @Test
+    @Test(groups="Integration")  // because slow
     public void testPlaneIdRolledBack() throws Exception {
         final LocalManagementContext mgmt = 
createManagementContext(PersistMode.AUTO, HighAvailabilityMode.AUTO);
 
@@ -101,7 +101,7 @@ public class ManagementPlaneIdTest {
         });
     }
 
-    @Test
+    @Test(groups="Integration")  // because slow
     public void testColdRebindInitialisesPlaneId() throws Exception {
         final LocalManagementContext origMgmt = 
createManagementContext(PersistMode.AUTO, HighAvailabilityMode.DISABLED);
         checkPlaneIdPersisted(origMgmt);
@@ -136,7 +136,7 @@ public class ManagementPlaneIdTest {
         });
     }
 
-    @Test
+    @Test(groups="Integration")  // because slow
     public void testHaFailoverKeepsPlaneId() throws Exception {
         final LocalManagementContext origMgmt = 
createManagementContext(PersistMode.AUTO, HighAvailabilityMode.MASTER);
         final LocalManagementContext rebindMgmt = 
createManagementContext(PersistMode.AUTO, HighAvailabilityMode.STANDBY);

Reply via email to