This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit c1991d1c440f578305c053dad3ce2f53b518d761
Author: Alex Heneveld <[email protected]>
AuthorDate: Fri Sep 10 17:52:37 2021 +0100

    lots more exclusions for read-only mode, and ability to interrogate them
    
    - no subscriptions
    - not added to persistence queue
---
 .../brooklyn/api/mgmt/rebind/RebindManager.java    |  4 +-
 .../catalog/internal/BasicBrooklynCatalog.java     |  9 ++-
 .../brooklyn/core/entity/AbstractEntity.java       | 21 ++++---
 .../org/apache/brooklyn/core/entity/Entities.java  |  4 +-
 .../brooklyn/core/mgmt/BrooklynTaskTags.java       |  1 +
 .../mgmt/internal/AbstractSubscriptionManager.java | 10 ++++
 .../mgmt/internal/LocalSubscriptionManager.java    |  5 ++
 .../internal/NonDeploymentManagementContext.java   |  7 ++-
 .../mgmt/rebind/PeriodicDeltaChangeListener.java   | 14 +++--
 .../brooklyn/core/mgmt/rebind/RebindIteration.java | 69 +++++++++++++++++++++-
 .../core/mgmt/rebind/RebindManagerImpl.java        | 12 +++-
 .../brooklyn/core/objs/AbstractEntityAdjunct.java  |  3 +-
 .../util/core/task/BasicExecutionManager.java      | 10 ++++
 .../brooklyn/core/mgmt/ha/HotStandbyTest.java      | 46 +++++++++------
 .../policy/failover/ElectPrimaryPolicy.java        |  4 +-
 .../java/org/apache/brooklyn/test/Asserts.java     |  9 ++-
 16 files changed, 182 insertions(+), 46 deletions(-)

diff --git 
a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java 
b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
index 7ad3191..d9d4422 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
@@ -97,6 +97,8 @@ public interface RebindManager {
      * Interrupts any current activity and waits for it to cease. */
     public void stopReadOnly();
 
+    public boolean isReadOnly();
+
     /**
      * Resets the effects of previously being read-only, ready to be used 
again (e.g. when promoting to master).
      * Expected to be called after {@link #stopReadOnly()} (thus long after 
{@link #setPersister(BrooklynMementoPersister)}, 
@@ -105,7 +107,7 @@ public interface RebindManager {
     public void reset();
 
     /** Starts the appropriate background processes, {@link 
#startPersistence()} if {@link ManagementNodeState#MASTER},
-     * {@link #startReadOnly()} if {@link ManagementNodeState#HOT_STANDBY} or 
{@link ManagementNodeState#HOT_BACKUP} */
+     * {@link #startReadOnly(ManagementNodeState)} if {@link 
ManagementNodeState#HOT_STANDBY} or {@link ManagementNodeState#HOT_BACKUP} */
     public void start();
     /** Stops the appropriate background processes, {@link #stopPersistence()} 
or {@link #stopReadOnly()},
      * waiting for activity there to cease (interrupting in the case of {@link 
#stopReadOnly()}). */
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/BasicBrooklynCatalog.java
 
b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/BasicBrooklynCatalog.java
index da334d0..71098f3 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/BasicBrooklynCatalog.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/BasicBrooklynCatalog.java
@@ -142,7 +142,9 @@ public class BasicBrooklynCatalog implements 
BrooklynCatalog {
     private volatile CatalogDo manualAdditionsCatalog;
     private volatile LoadedClassLoader manualAdditionsClasses;
     private final AggregateClassLoader rootClassLoader = 
AggregateClassLoader.newInstanceWithNoLoaders();
-    
+
+    private static boolean WARNED_RE_DSL_PARSER = false;
+
     /**
      * Cache of specs (used by {@link #peekSpec(CatalogItem)}).
      * We assume that no-one is modifying the catalog items (once added) 
without going through the
@@ -649,7 +651,10 @@ public class BasicBrooklynCatalog implements 
BrooklynCatalog {
             }
             
         } else {
-            log.info("No Camp-YAML parser registered for parsing catalog item 
DSL; skipping DSL-parsing");
+            if (!WARNED_RE_DSL_PARSER) {
+                log.warn("No Camp-YAML parser registered for parsing catalog 
item DSL; skipping DSL-parsing (no further warnings)");
+                WARNED_RE_DSL_PARSER = true;
+            }
         }
 
         Map<Object,Object> catalogMetadata = MutableMap.<Object, 
Object>builder()
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java 
b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index 515ed81..7c9607e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -966,7 +966,7 @@ public abstract class AbstractEntity extends 
AbstractBrooklynObject implements E
             if (LOG.isTraceEnabled())
                 LOG.trace(""+AbstractEntity.this+" setAttribute "+attribute+" 
"+val);
             
-            if (Boolean.TRUE.equals(getManagementSupport().isReadOnlyRaw())) {
+            if (Entities.isReadOnly(AbstractEntity.this)) {
                 T oldVal = getAttribute(attribute);
                 if (Equals.approximately(val, oldVal)) {
                     // ignore, probably an enricher resetting values or 
something on init
@@ -984,13 +984,18 @@ public abstract class AbstractEntity extends 
AbstractBrooklynObject implements E
                 }
             }
             T result = attributesInternal.update(attribute, val);
-            if (result == null) {
-                // could be this is a new sensor
-                entityType.addSensorIfAbsent(attribute);
-            }
-            
-            if (!Objects.equal(result, val)) {
-                
getManagementSupport().getEntityChangeListener().onAttributeChanged(attribute);
+
+            if (!Entities.isReadOnly(AbstractEntity.this)) {
+                // suppress notifications if read only
+
+                if (result == null) {
+                    // could be this is a new sensor
+                    entityType.addSensorIfAbsent(attribute);
+                }
+
+                if (!Objects.equal(result, val)) {
+                    
getManagementSupport().getEntityChangeListener().onAttributeChanged(attribute);
+                }
             }
             
             return result;
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java 
b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
index 4bb5835..43a5104 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
@@ -18,7 +18,9 @@
  */
 package org.apache.brooklyn.core.entity;
 
+import org.apache.brooklyn.api.objs.BrooklynObject;
 import org.apache.brooklyn.core.mgmt.BrooklynTags;
+import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
 import static org.apache.brooklyn.util.guava.Functionals.isSatisfied;
 
 import java.io.Closeable;
@@ -241,7 +243,7 @@ public class Entities {
     }
 
     /**
-     * @deprecated since 0.7; instead use {@link 
Sanitizer#IS_SECRET_PREDICATE.apply(Object)}
+     * @deprecated since 0.7; instead use {@link Sanitizer#IS_SECRET_PREDICATE}
      */
     @Deprecated
     public static boolean isSecret(String name) {
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
index 2bbd2c8..ea2dd84 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
@@ -39,6 +39,7 @@ import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext;
 import org.apache.brooklyn.api.objs.EntityAdjunct;
 import org.apache.brooklyn.core.config.Sanitizer;
+import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.internal.AbstractManagementContext;
 import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
index e7ae59e..eda80c3 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
@@ -30,6 +30,7 @@ import org.apache.brooklyn.api.mgmt.SubscriptionManager;
 import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,6 +116,11 @@ public abstract class AbstractSubscriptionManager 
implements SubscriptionManager
     /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, 
SensorEventListener) */
     @Override
     public final  <T> SubscriptionHandle subscribeToChildren(Map<String, 
Object> flags, final Entity parent, Sensor<T> sensor, SensorEventListener<? 
super T> listener) {
+        if (parent!=null && Entities.isReadOnly(parent)) {
+            LOG.trace("Skipping subscription in read only mode, children of {} 
{} {}", parent, sensor, flags);
+            return new Subscription<>(null, null, null);
+        }
+
         Predicate<SensorEvent<T>> eventFilter = new 
Predicate<SensorEvent<T>>() {
             @Override
             public boolean apply(SensorEvent<T> input) {
@@ -134,6 +140,10 @@ public abstract class AbstractSubscriptionManager 
implements SubscriptionManager
     /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, 
SensorEventListener) */
     @Override
     public final  <T> SubscriptionHandle subscribeToMembers(Map<String, 
Object> flags, final Group parent, Sensor<T> sensor, SensorEventListener<? 
super T> listener) {
+        if (parent!=null && Entities.isReadOnly(parent)) {
+            LOG.trace("Skipping subscription in read only mode, members of {} 
{} {}", parent, sensor, flags);
+            return new Subscription<>(null, null, null);
+        }
         Predicate<SensorEvent<T>> eventFilter = new 
Predicate<SensorEvent<T>>() {
             @Override
             public boolean apply(SensorEvent<T> input) {
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index ab82e47..9b2cd79 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -117,6 +117,11 @@ public class LocalSubscriptionManager extends 
AbstractSubscriptionManager {
     @SuppressWarnings("unchecked")
     protected synchronized <T> SubscriptionHandle subscribe(Map<String, 
Object> flags, final Subscription<T> s) {
         Entity producer = s.producer;
+        if (producer!=null && Entities.isReadOnly(producer)) {
+            LOG.trace("Skipping subscription in read only mode {} {}", s, 
flags);
+            return s;
+        }
+
         Sensor<T> sensor= s.sensor;
         s.subscriber = getSubscriber(flags, s);
         s.subscriptionDescription = getSubscriptionDescription(flags, s);
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
index 9653934..5613e91 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
@@ -591,7 +591,12 @@ public class NonDeploymentManagementContext implements 
ManagementContextInternal
         public void startReadOnly(ManagementNodeState state) {
             throw new IllegalStateException("Non-deployment context 
"+NonDeploymentManagementContext.this+" is not valid for this operation.");
         }
-        
+
+        @Override
+        public boolean isReadOnly() {
+            return true;
+        }
+
         @Override
         public void stopReadOnly() {
             throw new IllegalStateException("Non-deployment context 
"+NonDeploymentManagementContext.this+" is not valid for this operation.");
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
index cf8680c..f9b9be4 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
@@ -35,6 +35,7 @@ import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.mgmt.rebind.ChangeListener;
 import org.apache.brooklyn.api.mgmt.rebind.PersistenceExceptionHandler;
+import org.apache.brooklyn.api.mgmt.rebind.RebindManager;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister;
 import org.apache.brooklyn.api.objs.BrooklynObject;
 import org.apache.brooklyn.api.objs.BrooklynObjectType;
@@ -180,6 +181,7 @@ public class PeriodicDeltaChangeListener implements 
ChangeListener {
     }
     
     private final ExecutionContext executionContext;
+    private final RebindManager rebindManager;
     
     private final BrooklynMementoPersister persister;
 
@@ -198,7 +200,7 @@ public class PeriodicDeltaChangeListener implements 
ChangeListener {
     private final boolean persistEnrichersEnabled;
     private final boolean persistFeedsEnabled;
     private final boolean rePersistReferencedObjectsEnabled;
-    
+
     private final Semaphore persistingMutex = new Semaphore(1);
     private final Object startStopMutex = new Object();
     private final AtomicInteger writeCount = new AtomicInteger(0);
@@ -210,6 +212,7 @@ public class PeriodicDeltaChangeListener implements 
ChangeListener {
 
     public PeriodicDeltaChangeListener(
             Supplier<String> planeIdSupplier,
+            RebindManager rebindManager,
             ExecutionContext executionContext,
             BrooklynMementoPersister persister,
             PersistenceExceptionHandler exceptionHandler,
@@ -218,6 +221,7 @@ public class PeriodicDeltaChangeListener implements 
ChangeListener {
         BasicExecutionManager.registerUninterestingTaskName(TASK_NAME,true);
         this.planeIdSupplier = planeIdSupplier;
         this.executionContext = executionContext;
+        this.rebindManager = rebindManager;
         this.persister = persister;
         this.exceptionHandler = exceptionHandler;
         this.metrics = metrics;
@@ -560,8 +564,10 @@ public class PeriodicDeltaChangeListener implements 
ChangeListener {
     @Override
     public synchronized void onManaged(BrooklynObject instance) {
         if (LOG.isTraceEnabled()) LOG.trace("onManaged: {}", instance);
-        onChanged(instance);
-        addReferencedObjectsForInitialPersist(instance);
+        if (!rebindManager.isReadOnly()) {
+            onChanged(instance);
+            addReferencedObjectsForInitialPersist(instance);
+        }
     }
 
     private void addReferencedObjectsForInitialPersist(BrooklynObject 
instance) {
@@ -612,7 +618,7 @@ public class PeriodicDeltaChangeListener implements 
ChangeListener {
     @Override
     public synchronized void onChanged(BrooklynObject instance) {
         if (LOG.isTraceEnabled()) LOG.trace("onChanged: {}", instance);
-        if (!isStopped()) {
+        if (!isStopped() && !rebindManager.isReadOnly()) {
             deltaCollector.add(instance);
         }
     }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index 46ae2f8..38f4af7 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -20,6 +20,8 @@ package org.apache.brooklyn.core.mgmt.rebind;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.brooklyn.api.mgmt.Task;
 import static 
org.apache.brooklyn.core.BrooklynFeatureEnablement.FEATURE_AUTO_FIX_CATALOG_REF_ON_REBIND;
 import static 
org.apache.brooklyn.core.BrooklynFeatureEnablement.FEATURE_BACKWARDS_COMPATIBILITY_INFER_CATALOG_ITEM_ON_REBIND;
 import static 
org.apache.brooklyn.core.catalog.internal.CatalogUtils.newClassLoadingContextForCatalogItems;
@@ -83,6 +85,7 @@ import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.feed.AbstractFeed;
 import org.apache.brooklyn.core.location.AbstractLocation;
 import org.apache.brooklyn.core.location.internal.LocationInternal;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import 
org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContextSequential;
 import 
org.apache.brooklyn.core.mgmt.classloading.JavaBrooklynClassLoadingContext;
 import org.apache.brooklyn.core.mgmt.ha.OsgiManager;
@@ -111,6 +114,7 @@ import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.ClassLoaderUtils;
 import org.apache.brooklyn.util.core.flags.FlagUtils;
 import org.apache.brooklyn.util.core.task.BasicExecutionContext;
+import org.apache.brooklyn.util.core.task.BasicExecutionManager;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.guava.Maybe;
@@ -118,6 +122,7 @@ import org.apache.brooklyn.util.javalang.Reflections;
 import org.apache.brooklyn.util.osgi.VersionedName;
 import org.apache.brooklyn.util.stream.InputStreamSource;
 import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.CountdownTimer;
 import org.apache.brooklyn.util.time.Duration;
 import org.apache.brooklyn.util.time.Time;
 import org.slf4j.Logger;
@@ -256,6 +261,34 @@ public abstract class RebindIteration {
     }
     
     protected void doRun() throws Exception {
+        if (readOnlyRebindCount.get()>1) {
+            // wait for tasks
+            Collection<Task<?>> entityTasks = ((BasicExecutionManager) 
managementContext.getExecutionManager()).allTasksLive()
+                            .stream().filter(t -> 
BrooklynTaskTags.getContextEntity(t)!=null).collect(Collectors.toList());
+            List<Task<?>> openTasks;
+            CountdownTimer time = 
CountdownTimer.newInstanceStarted(Duration.seconds(15));
+            do {
+                openTasks = entityTasks.stream().filter(t -> 
!t.isDone()).collect(Collectors.toList());
+                if (openTasks.isEmpty()) break;
+                if (time.isExpired()) {
+                    LOG.warn("Aborting "+openTasks.size()+" incomplete task(s) 
before rebinding again: "+openTasks);
+                    openTasks.forEach(t -> t.cancel(true));
+                }
+                if 
(time.getDurationElapsed().isShorterThan(Duration.millis(200))) {
+                    LOG.info("Waiting on " + openTasks.size() + " task(s) 
before rebinding again: " + openTasks);
+                } else {
+                    LOG.debug("Waiting on " + openTasks.size() + " task(s) 
before rebinding again: " + openTasks);
+                }
+                Time.sleep(Duration.millis(200));
+            } while (true);
+
+            entityTasks.forEach( ((BasicExecutionManager) 
managementContext.getExecutionManager())::deleteTask );
+
+            List<Task<?>> otherDoneTasks = ((BasicExecutionManager) 
managementContext.getExecutionManager()).allTasksLive()
+                    
.stream().filter(Task::isDone).collect(Collectors.toList());
+            otherDoneTasks.forEach( ((BasicExecutionManager) 
managementContext.getExecutionManager())::deleteTask );
+        }
+
         loadManifestFiles();
         initPlaneId();
         installBundlesAndRebuildCatalog();
@@ -263,7 +296,7 @@ public abstract class RebindIteration {
         instantiateMementos();
         // adjuncts depend on actual mementos; whereas entity works off 
special memento manifest, 
         // and location, bundles etc just take type and id
-        instantiateAdjuncts(instantiator); 
+        instantiateAdjuncts(instantiator);
         reconstructEverything();
         associateAdjunctsWithEntities();
         manageTheObjects();
@@ -654,7 +687,6 @@ public abstract class RebindIteration {
     }
 
     protected void associateAdjunctsWithEntities() {
-        
         checkEnteringPhase(7);
 
         logRebindingDebug("RebindManager associating adjuncts to entities");
@@ -684,12 +716,43 @@ public abstract class RebindIteration {
                 
((EntityInternal)entity).getExecutionContext().get(Tasks.<Void>builder()
                         .displayName("rebind-adjuncts-"+entity.getId())
                         .dynamic(false)
-                        .body(body)
+                        .body(new RebindAdjuncts(entityMemento, entity, 
rebindContext, exceptionHandler))
                         .build());
             }
         }
     }
 
+    protected static class RebindAdjuncts implements Runnable {
+        private EntityMemento entityMemento;
+        private Entity entity;
+        private RebindContextImpl rebindContext;
+        private RebindExceptionHandler exceptionHandler;
+
+        public RebindAdjuncts(EntityMemento entityMemento, Entity entity, 
RebindContextImpl rebindContext, RebindExceptionHandler exceptionHandler) {
+            this.entityMemento = entityMemento;
+            this.entity = entity;
+            this.rebindContext = rebindContext;
+            this.exceptionHandler = exceptionHandler;
+        }
+
+        @Override
+        public void run() {
+            try {
+                entityMemento.injectTypeClass(entity.getClass());
+                // TODO these call to the entity which in turn sets the entity 
on the underlying feeds and enrichers;
+                // that is taken as the cue to start, but it should not be. 
start should be a separate call.
+                
((EntityInternal)entity).getRebindSupport().addPolicies(rebindContext, 
entityMemento);
+                
((EntityInternal)entity).getRebindSupport().addEnrichers(rebindContext, 
entityMemento);
+                
((EntityInternal)entity).getRebindSupport().addFeeds(rebindContext, 
entityMemento);
+
+                entityMemento = null;
+                entity = null;
+            } catch (Exception e) {
+                exceptionHandler.onRebindFailed(BrooklynObjectType.ENTITY, 
entity, e);
+            }
+        }
+    }
+
     protected void manageTheObjects() {
 
         checkEnteringPhase(8);
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index 21363be..3a597ab 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -220,7 +220,12 @@ public class RebindManagerImpl implements RebindManager {
     public boolean isReadOnlyRunning() {
         return readOnlyRunning;
     }
-    
+
+    @Override
+    public boolean isReadOnly() {
+        return isReadOnlyRunning();
+    }
+
     public boolean isReadOnlyStopping() {
         return readOnlyTask!=null && !readOnlyRunning;
     }
@@ -251,6 +256,7 @@ public class RebindManagerImpl implements RebindManager {
         
         this.persistenceRealChangeListener = new PeriodicDeltaChangeListener(
                 new PlaneIdSupplier(),
+                this,
                 managementContext.getServerExecutionContext(),
                 persistenceStoreAccess,
                 exceptionHandler,
@@ -290,7 +296,9 @@ public class RebindManagerImpl implements RebindManager {
         persistenceRunning = true;
         readOnlyRebindCount.set(Integer.MIN_VALUE);
         persistenceStoreAccess.enableWriteAccess();
-        if (persistenceRealChangeListener != null) 
persistenceRealChangeListener.start();
+        if (persistenceRealChangeListener != null) {
+            persistenceRealChangeListener.start();
+        }
     }
 
     @Override
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java 
b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index 7d63b0e..2562c46 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -18,6 +18,7 @@
  */
 package org.apache.brooklyn.core.objs;
 
+import java.util.concurrent.ConcurrentHashMap;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
 
 import java.util.Collection;
@@ -111,7 +112,7 @@ public abstract class AbstractEntityAdjunct extends 
AbstractBrooklynObject imple
     @SetFromFlag(value="uniqueTag")
     protected String uniqueTag;
 
-    private Map<String, HighlightTuple> highlights = new HashMap<>();
+    private Map<String, HighlightTuple> highlights = new ConcurrentHashMap<>();
 
     /** Name of a highlight that indicates the last action taken by this 
adjunct. */
     public static String HIGHLIGHT_NAME_LAST_ACTION = "lastAction";
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index e59e9ed..29984e2 100644
--- 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import java.util.stream.Collectors;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.internal.BrooklynLoggingCategories;
 import org.apache.brooklyn.api.mgmt.ExecutionManager;
@@ -58,6 +59,8 @@ import org.apache.brooklyn.core.BrooklynLogging;
 import org.apache.brooklyn.core.config.Sanitizer;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedStream;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
@@ -368,6 +371,13 @@ public class BasicExecutionManager implements 
ExecutionManager {
                 log.warn("Deleting submitted task before completion: 
"+removed+"; this task will continue to run in the background outwith "+this+", 
but perhaps it should have been cancelled?");
             }
         }
+        task.getTags().forEach(t -> {
+            // remove tags which might have references to entities etc (help 
out garbage collector)
+            if (t instanceof TaskInternal) {
+                Set<Object> tagsM = ((TaskInternal) t).getMutableTags();
+                tagsM.removeAll(tagsM.stream().filter(tag -> tag instanceof 
WrappedStream || tag instanceof WrappedEntity).collect(Collectors.toList()));
+            }
+        });
         return removed != null;
     }
 
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java 
b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java
index f3bd7fc..f1e7a41 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java
@@ -19,6 +19,7 @@
 package org.apache.brooklyn.core.mgmt.ha;
 
 import java.util.Collection;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -232,13 +233,10 @@ public class HotStandbyTest {
         assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app");
         assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3);
 
-        try {
-            ((TestApplication)appRO).sensors().set(TestEntity.SEQUENCE, 4);
-            Assert.fail("Should not have allowed sensor to be set");
-        } catch (Exception e) {
-            
Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error 
message did not contain expected text: "+e);
-        }
-        assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3);
+        setSensorOnReadOnly(appRO, TestEntity.SEQUENCE, 4);
+
+        // it will revert to 3
+        EntityAsserts.assertAttributeEqualsEventually(appRO, 
TestEntity.SEQUENCE, (Integer)3);
     }
 
     @Test
@@ -410,11 +408,13 @@ public class HotStandbyTest {
 //        Jenkins: java.lang.AssertionError: Too much memory used - 158m > max 
154m
 //        Svet local: java.lang.AssertionError: Too much memory used - 16m > 
max 13m
 //    The test is not deterministic so marking as "Manual", i.e. probably 
shouldn't be included in automated tests.
+    // But it is a very useful check that hot standby is stable!
     @Test(groups={"Integration", "Broken", "Manual"})
     public void testHotStandbyDoesNotLeakBigObjects() throws Exception {
         log.info("Starting test "+JavaClassNames.niceClassAndMethod());
         final int SIZE = 5;
-        final int SIZE_UP_BOUND = SIZE+2;
+        final int SIZE_UP_BOUND1 = SIZE+2;
+        final int SIZE_UP_BOUND2 = SIZE_UP_BOUND1;
         final int SIZE_DOWN_BOUND = SIZE-1;
         final int GRACE = 2;
         // the XML persistence uses a lot of space, we approx at between 2x 
and 3c
@@ -426,16 +426,18 @@ public class HotStandbyTest {
         noteUsedMemory("Finished seeding");
         Long initialUsed = memoryAssertions.peekLastUsedMemory();
         app.config().set(TestEntity.CONF_OBJECT, new 
BigObject(SIZE*1000*1000));
-        assertUsedMemoryMaxDelta("Set a big config object", SIZE_UP_BOUND);
+        assertUsedMemoryMaxDelta("Set a big config object", SIZE_UP_BOUND1);
         forcePersistNow(n1);
         assertUsedMemoryMaxDelta("Persisted a big config object", SIZE_IN_XML);
         
         HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
         forceRebindNow(n2);
-        assertUsedMemoryMaxDelta("Rebinded", SIZE_UP_BOUND);
+        assertUsedMemoryMaxDelta("Rebinded", SIZE_UP_BOUND2);
         
-        for (int i=0; i<10; i++)
+        for (int i=0; i<10; i++) {
             forceRebindNow(n2);
+        }
+
         assertUsedMemoryMaxDelta("Several more rebinds", GRACE);
         for (int i=0; i<10; i++) {
             forcePersistNow(n1);
@@ -446,10 +448,10 @@ public class HotStandbyTest {
         app.config().set(TestEntity.CONF_OBJECT, "big is now small");
         assertUsedMemoryMaxDelta("Big made small at primary", 
-SIZE_DOWN_BOUND);
         forcePersistNow(n1);
-        assertUsedMemoryMaxDelta("And persisted", -SIZE_IN_XML_DOWN);
+        assertUsedMemoryMaxDelta("And persisted", 0); //-SIZE_IN_XML_DOWN);
         
         forceRebindNow(n2);
-        assertUsedMemoryMaxDelta("And at secondary", -SIZE_DOWN_BOUND);
+        assertUsedMemoryMaxDelta("And at secondary", 0); //-SIZE_DOWN_BOUND);
         
         Entities.unmanage(app);
         forcePersistNow(n1);
@@ -543,12 +545,7 @@ public class HotStandbyTest {
         });
 
         Application app2RO = n1.mgmt.lookup(app2.getId(), Application.class);
-        try {
-            ((TestApplication)app2RO).sensors().set(TestEntity.SEQUENCE, 4);
-            Assert.fail("Should not have allowed sensor to be set");
-        } catch (Exception e) {
-            
Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error 
message did not contain expected text: "+e);
-        }
+        setSensorOnReadOnly(app2RO, TestEntity.SEQUENCE, 4);
 
         n1.ha.changeMode(HighAvailabilityMode.AUTO);
         n2.ha.changeMode(HighAvailabilityMode.HOT_STANDBY, true, false);
@@ -574,6 +571,17 @@ public class HotStandbyTest {
         EntityAsserts.assertAttributeEquals(app2BRO, TestEntity.SEQUENCE, 4);
     }
 
+    private <T> void setSensorOnReadOnly(Entity item, AttributeSensor<T> 
sensor, T value) {
+        try {
+            item.sensors().set(sensor, value);
+
+            // we now allow access to the sensors -- but they don't have 
effect and a warning is logged
+            // Assert.fail("Should not have allowed sensor to be set");
+        } catch (Exception e) {
+            // 
Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error 
message did not contain expected text: "+e);
+        }
+    }
+
     @Test(groups="Integration", invocationCount=20)
     public void testChangeModeManyTimes() throws Exception {
         testChangeMode();
diff --git 
a/policy/src/main/java/org/apache/brooklyn/policy/failover/ElectPrimaryPolicy.java
 
b/policy/src/main/java/org/apache/brooklyn/policy/failover/ElectPrimaryPolicy.java
index a0187f7..ac9531d 100644
--- 
a/policy/src/main/java/org/apache/brooklyn/policy/failover/ElectPrimaryPolicy.java
+++ 
b/policy/src/main/java/org/apache/brooklyn/policy/failover/ElectPrimaryPolicy.java
@@ -112,8 +112,8 @@ public class ElectPrimaryPolicy extends AbstractPolicy 
implements ElectPrimaryCo
     public void setEntity(@SuppressWarnings("deprecation") 
org.apache.brooklyn.api.entity.EntityLocal entity) {
         super.setEntity(entity);
 
-        if (!Entities.isManagedActive(entity)) {
-            // hot standby or entity finished
+        if (Entities.isReadOnly(entity)) {
+            // don't run in hot standby
             return;
         }
         
diff --git a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java 
b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
index 6d38525..002e3c6 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
@@ -1531,6 +1531,10 @@ public class Asserts {
         }
 
         public void assertUsedMemoryLessThan(String event, long max, boolean 
push) {
+            assertUsedMemoryLessThan(event, max, push, null);
+        }
+
+        public void assertUsedMemoryLessThan(String event, long max, boolean 
push, String extraFailMessage) {
             long nowUsed = pushUsedMemory(event);
             if (nowUsed > max) {
                 // aggressively try to force GC
@@ -1538,14 +1542,15 @@ public class Asserts {
                 popUsedMemory();
                 nowUsed = pushUsedMemory(event+" (extra GC)");
                 if (nowUsed > max) {
-                    fail("Too much memory used - "+ 
ByteSizeStrings.java().apply(nowUsed)+" > max 
"+ByteSizeStrings.java().apply(max));
+                    fail("Too much memory used - "+ 
ByteSizeStrings.java().apply(nowUsed)+" > max 
"+ByteSizeStrings.java().apply(max)+
+                            (extraFailMessage==null ? "" : " 
"+extraFailMessage));
                 }
             }
             if (!push) popUsedMemory();
         }
         public void assertUsedMemoryMaxDelta(String event, long 
deltaMegabytes, boolean push) {
             final long last = peekLastUsedMemory();
-            assertUsedMemoryLessThan(event, last + deltaMegabytes*1024*1024, 
push);
+            assertUsedMemoryLessThan(event, last + deltaMegabytes*1024*1024, 
push, "(prev was "+ByteSizeStrings.java().apply(last)+", delta limit was 
"+ByteSizeStrings.java().apply(deltaMegabytes*1024*1024)+")");
         }
     }
 }

Reply via email to