This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit c1991d1c440f578305c053dad3ce2f53b518d761 Author: Alex Heneveld <[email protected]> AuthorDate: Fri Sep 10 17:52:37 2021 +0100 lots more exclusions for read-only mode, and ability to interrogate them - no subscriptions - not added to persistence queue --- .../brooklyn/api/mgmt/rebind/RebindManager.java | 4 +- .../catalog/internal/BasicBrooklynCatalog.java | 9 ++- .../brooklyn/core/entity/AbstractEntity.java | 21 ++++--- .../org/apache/brooklyn/core/entity/Entities.java | 4 +- .../brooklyn/core/mgmt/BrooklynTaskTags.java | 1 + .../mgmt/internal/AbstractSubscriptionManager.java | 10 ++++ .../mgmt/internal/LocalSubscriptionManager.java | 5 ++ .../internal/NonDeploymentManagementContext.java | 7 ++- .../mgmt/rebind/PeriodicDeltaChangeListener.java | 14 +++-- .../brooklyn/core/mgmt/rebind/RebindIteration.java | 69 +++++++++++++++++++++- .../core/mgmt/rebind/RebindManagerImpl.java | 12 +++- .../brooklyn/core/objs/AbstractEntityAdjunct.java | 3 +- .../util/core/task/BasicExecutionManager.java | 10 ++++ .../brooklyn/core/mgmt/ha/HotStandbyTest.java | 46 +++++++++------ .../policy/failover/ElectPrimaryPolicy.java | 4 +- .../java/org/apache/brooklyn/test/Asserts.java | 9 ++- 16 files changed, 182 insertions(+), 46 deletions(-) diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java index 7ad3191..d9d4422 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java @@ -97,6 +97,8 @@ public interface RebindManager { * Interrupts any current activity and waits for it to cease. */ public void stopReadOnly(); + public boolean isReadOnly(); + /** * Resets the effects of previously being read-only, ready to be used again (e.g. when promoting to master). * Expected to be called after {@link #stopReadOnly()} (thus long after {@link #setPersister(BrooklynMementoPersister)}, @@ -105,7 +107,7 @@ public interface RebindManager { public void reset(); /** Starts the appropriate background processes, {@link #startPersistence()} if {@link ManagementNodeState#MASTER}, - * {@link #startReadOnly()} if {@link ManagementNodeState#HOT_STANDBY} or {@link ManagementNodeState#HOT_BACKUP} */ + * {@link #startReadOnly(ManagementNodeState)} if {@link ManagementNodeState#HOT_STANDBY} or {@link ManagementNodeState#HOT_BACKUP} */ public void start(); /** Stops the appropriate background processes, {@link #stopPersistence()} or {@link #stopReadOnly()}, * waiting for activity there to cease (interrupting in the case of {@link #stopReadOnly()}). */ diff --git a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/BasicBrooklynCatalog.java b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/BasicBrooklynCatalog.java index da334d0..71098f3 100644 --- a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/BasicBrooklynCatalog.java +++ b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/BasicBrooklynCatalog.java @@ -142,7 +142,9 @@ public class BasicBrooklynCatalog implements BrooklynCatalog { private volatile CatalogDo manualAdditionsCatalog; private volatile LoadedClassLoader manualAdditionsClasses; private final AggregateClassLoader rootClassLoader = AggregateClassLoader.newInstanceWithNoLoaders(); - + + private static boolean WARNED_RE_DSL_PARSER = false; + /** * Cache of specs (used by {@link #peekSpec(CatalogItem)}). * We assume that no-one is modifying the catalog items (once added) without going through the @@ -649,7 +651,10 @@ public class BasicBrooklynCatalog implements BrooklynCatalog { } } else { - log.info("No Camp-YAML parser registered for parsing catalog item DSL; skipping DSL-parsing"); + if (!WARNED_RE_DSL_PARSER) { + log.warn("No Camp-YAML parser registered for parsing catalog item DSL; skipping DSL-parsing (no further warnings)"); + WARNED_RE_DSL_PARSER = true; + } } Map<Object,Object> catalogMetadata = MutableMap.<Object, Object>builder() diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java index 515ed81..7c9607e 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java @@ -966,7 +966,7 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E if (LOG.isTraceEnabled()) LOG.trace(""+AbstractEntity.this+" setAttribute "+attribute+" "+val); - if (Boolean.TRUE.equals(getManagementSupport().isReadOnlyRaw())) { + if (Entities.isReadOnly(AbstractEntity.this)) { T oldVal = getAttribute(attribute); if (Equals.approximately(val, oldVal)) { // ignore, probably an enricher resetting values or something on init @@ -984,13 +984,18 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E } } T result = attributesInternal.update(attribute, val); - if (result == null) { - // could be this is a new sensor - entityType.addSensorIfAbsent(attribute); - } - - if (!Objects.equal(result, val)) { - getManagementSupport().getEntityChangeListener().onAttributeChanged(attribute); + + if (!Entities.isReadOnly(AbstractEntity.this)) { + // suppress notifications if read only + + if (result == null) { + // could be this is a new sensor + entityType.addSensorIfAbsent(attribute); + } + + if (!Objects.equal(result, val)) { + getManagementSupport().getEntityChangeListener().onAttributeChanged(attribute); + } } return result; diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java index 4bb5835..43a5104 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java @@ -18,7 +18,9 @@ */ package org.apache.brooklyn.core.entity; +import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.core.mgmt.BrooklynTags; +import org.apache.brooklyn.core.objs.BrooklynObjectInternal; import static org.apache.brooklyn.util.guava.Functionals.isSatisfied; import java.io.Closeable; @@ -241,7 +243,7 @@ public class Entities { } /** - * @deprecated since 0.7; instead use {@link Sanitizer#IS_SECRET_PREDICATE.apply(Object)} + * @deprecated since 0.7; instead use {@link Sanitizer#IS_SECRET_PREDICATE} */ @Deprecated public static boolean isSecret(String name) { diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java index 2bbd2c8..ea2dd84 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java @@ -39,6 +39,7 @@ import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext; import org.apache.brooklyn.api.objs.EntityAdjunct; import org.apache.brooklyn.core.config.Sanitizer; +import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.internal.AbstractManagementContext; import org.apache.brooklyn.core.objs.AbstractEntityAdjunct; diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java index e7ae59e..eda80c3 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java @@ -30,6 +30,7 @@ import org.apache.brooklyn.api.mgmt.SubscriptionManager; import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.util.text.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,6 +116,11 @@ public abstract class AbstractSubscriptionManager implements SubscriptionManager /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */ @Override public final <T> SubscriptionHandle subscribeToChildren(Map<String, Object> flags, final Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { + if (parent!=null && Entities.isReadOnly(parent)) { + LOG.trace("Skipping subscription in read only mode, children of {} {} {}", parent, sensor, flags); + return new Subscription<>(null, null, null); + } + Predicate<SensorEvent<T>> eventFilter = new Predicate<SensorEvent<T>>() { @Override public boolean apply(SensorEvent<T> input) { @@ -134,6 +140,10 @@ public abstract class AbstractSubscriptionManager implements SubscriptionManager /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */ @Override public final <T> SubscriptionHandle subscribeToMembers(Map<String, Object> flags, final Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { + if (parent!=null && Entities.isReadOnly(parent)) { + LOG.trace("Skipping subscription in read only mode, members of {} {} {}", parent, sensor, flags); + return new Subscription<>(null, null, null); + } Predicate<SensorEvent<T>> eventFilter = new Predicate<SensorEvent<T>>() { @Override public boolean apply(SensorEvent<T> input) { diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java index ab82e47..9b2cd79 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java @@ -117,6 +117,11 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager { @SuppressWarnings("unchecked") protected synchronized <T> SubscriptionHandle subscribe(Map<String, Object> flags, final Subscription<T> s) { Entity producer = s.producer; + if (producer!=null && Entities.isReadOnly(producer)) { + LOG.trace("Skipping subscription in read only mode {} {}", s, flags); + return s; + } + Sensor<T> sensor= s.sensor; s.subscriber = getSubscriber(flags, s); s.subscriptionDescription = getSubscriptionDescription(flags, s); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java index 9653934..5613e91 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java @@ -591,7 +591,12 @@ public class NonDeploymentManagementContext implements ManagementContextInternal public void startReadOnly(ManagementNodeState state) { throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+" is not valid for this operation."); } - + + @Override + public boolean isReadOnly() { + return true; + } + @Override public void stopReadOnly() { throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+" is not valid for this operation."); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java index cf8680c..f9b9be4 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java @@ -35,6 +35,7 @@ import org.apache.brooklyn.api.mgmt.ExecutionContext; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.rebind.ChangeListener; import org.apache.brooklyn.api.mgmt.rebind.PersistenceExceptionHandler; +import org.apache.brooklyn.api.mgmt.rebind.RebindManager; import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister; import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.api.objs.BrooklynObjectType; @@ -180,6 +181,7 @@ public class PeriodicDeltaChangeListener implements ChangeListener { } private final ExecutionContext executionContext; + private final RebindManager rebindManager; private final BrooklynMementoPersister persister; @@ -198,7 +200,7 @@ public class PeriodicDeltaChangeListener implements ChangeListener { private final boolean persistEnrichersEnabled; private final boolean persistFeedsEnabled; private final boolean rePersistReferencedObjectsEnabled; - + private final Semaphore persistingMutex = new Semaphore(1); private final Object startStopMutex = new Object(); private final AtomicInteger writeCount = new AtomicInteger(0); @@ -210,6 +212,7 @@ public class PeriodicDeltaChangeListener implements ChangeListener { public PeriodicDeltaChangeListener( Supplier<String> planeIdSupplier, + RebindManager rebindManager, ExecutionContext executionContext, BrooklynMementoPersister persister, PersistenceExceptionHandler exceptionHandler, @@ -218,6 +221,7 @@ public class PeriodicDeltaChangeListener implements ChangeListener { BasicExecutionManager.registerUninterestingTaskName(TASK_NAME,true); this.planeIdSupplier = planeIdSupplier; this.executionContext = executionContext; + this.rebindManager = rebindManager; this.persister = persister; this.exceptionHandler = exceptionHandler; this.metrics = metrics; @@ -560,8 +564,10 @@ public class PeriodicDeltaChangeListener implements ChangeListener { @Override public synchronized void onManaged(BrooklynObject instance) { if (LOG.isTraceEnabled()) LOG.trace("onManaged: {}", instance); - onChanged(instance); - addReferencedObjectsForInitialPersist(instance); + if (!rebindManager.isReadOnly()) { + onChanged(instance); + addReferencedObjectsForInitialPersist(instance); + } } private void addReferencedObjectsForInitialPersist(BrooklynObject instance) { @@ -612,7 +618,7 @@ public class PeriodicDeltaChangeListener implements ChangeListener { @Override public synchronized void onChanged(BrooklynObject instance) { if (LOG.isTraceEnabled()) LOG.trace("onChanged: {}", instance); - if (!isStopped()) { + if (!isStopped() && !rebindManager.isReadOnly()) { deltaCollector.add(instance); } } diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java index 46ae2f8..38f4af7 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java @@ -20,6 +20,8 @@ package org.apache.brooklyn.core.mgmt.rebind; import static com.google.common.base.Preconditions.checkNotNull; import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.brooklyn.api.mgmt.Task; import static org.apache.brooklyn.core.BrooklynFeatureEnablement.FEATURE_AUTO_FIX_CATALOG_REF_ON_REBIND; import static org.apache.brooklyn.core.BrooklynFeatureEnablement.FEATURE_BACKWARDS_COMPATIBILITY_INFER_CATALOG_ITEM_ON_REBIND; import static org.apache.brooklyn.core.catalog.internal.CatalogUtils.newClassLoadingContextForCatalogItems; @@ -83,6 +85,7 @@ import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.feed.AbstractFeed; import org.apache.brooklyn.core.location.AbstractLocation; import org.apache.brooklyn.core.location.internal.LocationInternal; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContextSequential; import org.apache.brooklyn.core.mgmt.classloading.JavaBrooklynClassLoadingContext; import org.apache.brooklyn.core.mgmt.ha.OsgiManager; @@ -111,6 +114,7 @@ import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.ClassLoaderUtils; import org.apache.brooklyn.util.core.flags.FlagUtils; import org.apache.brooklyn.util.core.task.BasicExecutionContext; +import org.apache.brooklyn.util.core.task.BasicExecutionManager; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.guava.Maybe; @@ -118,6 +122,7 @@ import org.apache.brooklyn.util.javalang.Reflections; import org.apache.brooklyn.util.osgi.VersionedName; import org.apache.brooklyn.util.stream.InputStreamSource; import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.CountdownTimer; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.Time; import org.slf4j.Logger; @@ -256,6 +261,34 @@ public abstract class RebindIteration { } protected void doRun() throws Exception { + if (readOnlyRebindCount.get()>1) { + // wait for tasks + Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive() + .stream().filter(t -> BrooklynTaskTags.getContextEntity(t)!=null).collect(Collectors.toList()); + List<Task<?>> openTasks; + CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15)); + do { + openTasks = entityTasks.stream().filter(t -> !t.isDone()).collect(Collectors.toList()); + if (openTasks.isEmpty()) break; + if (time.isExpired()) { + LOG.warn("Aborting "+openTasks.size()+" incomplete task(s) before rebinding again: "+openTasks); + openTasks.forEach(t -> t.cancel(true)); + } + if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) { + LOG.info("Waiting on " + openTasks.size() + " task(s) before rebinding again: " + openTasks); + } else { + LOG.debug("Waiting on " + openTasks.size() + " task(s) before rebinding again: " + openTasks); + } + Time.sleep(Duration.millis(200)); + } while (true); + + entityTasks.forEach( ((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask ); + + List<Task<?>> otherDoneTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive() + .stream().filter(Task::isDone).collect(Collectors.toList()); + otherDoneTasks.forEach( ((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask ); + } + loadManifestFiles(); initPlaneId(); installBundlesAndRebuildCatalog(); @@ -263,7 +296,7 @@ public abstract class RebindIteration { instantiateMementos(); // adjuncts depend on actual mementos; whereas entity works off special memento manifest, // and location, bundles etc just take type and id - instantiateAdjuncts(instantiator); + instantiateAdjuncts(instantiator); reconstructEverything(); associateAdjunctsWithEntities(); manageTheObjects(); @@ -654,7 +687,6 @@ public abstract class RebindIteration { } protected void associateAdjunctsWithEntities() { - checkEnteringPhase(7); logRebindingDebug("RebindManager associating adjuncts to entities"); @@ -684,12 +716,43 @@ public abstract class RebindIteration { ((EntityInternal)entity).getExecutionContext().get(Tasks.<Void>builder() .displayName("rebind-adjuncts-"+entity.getId()) .dynamic(false) - .body(body) + .body(new RebindAdjuncts(entityMemento, entity, rebindContext, exceptionHandler)) .build()); } } } + protected static class RebindAdjuncts implements Runnable { + private EntityMemento entityMemento; + private Entity entity; + private RebindContextImpl rebindContext; + private RebindExceptionHandler exceptionHandler; + + public RebindAdjuncts(EntityMemento entityMemento, Entity entity, RebindContextImpl rebindContext, RebindExceptionHandler exceptionHandler) { + this.entityMemento = entityMemento; + this.entity = entity; + this.rebindContext = rebindContext; + this.exceptionHandler = exceptionHandler; + } + + @Override + public void run() { + try { + entityMemento.injectTypeClass(entity.getClass()); + // TODO these call to the entity which in turn sets the entity on the underlying feeds and enrichers; + // that is taken as the cue to start, but it should not be. start should be a separate call. + ((EntityInternal)entity).getRebindSupport().addPolicies(rebindContext, entityMemento); + ((EntityInternal)entity).getRebindSupport().addEnrichers(rebindContext, entityMemento); + ((EntityInternal)entity).getRebindSupport().addFeeds(rebindContext, entityMemento); + + entityMemento = null; + entity = null; + } catch (Exception e) { + exceptionHandler.onRebindFailed(BrooklynObjectType.ENTITY, entity, e); + } + } + } + protected void manageTheObjects() { checkEnteringPhase(8); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java index 21363be..3a597ab 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java @@ -220,7 +220,12 @@ public class RebindManagerImpl implements RebindManager { public boolean isReadOnlyRunning() { return readOnlyRunning; } - + + @Override + public boolean isReadOnly() { + return isReadOnlyRunning(); + } + public boolean isReadOnlyStopping() { return readOnlyTask!=null && !readOnlyRunning; } @@ -251,6 +256,7 @@ public class RebindManagerImpl implements RebindManager { this.persistenceRealChangeListener = new PeriodicDeltaChangeListener( new PlaneIdSupplier(), + this, managementContext.getServerExecutionContext(), persistenceStoreAccess, exceptionHandler, @@ -290,7 +296,9 @@ public class RebindManagerImpl implements RebindManager { persistenceRunning = true; readOnlyRebindCount.set(Integer.MIN_VALUE); persistenceStoreAccess.enableWriteAccess(); - if (persistenceRealChangeListener != null) persistenceRealChangeListener.start(); + if (persistenceRealChangeListener != null) { + persistenceRealChangeListener.start(); + } } @Override diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java index 7d63b0e..2562c46 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java @@ -18,6 +18,7 @@ */ package org.apache.brooklyn.core.objs; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth; import java.util.Collection; @@ -111,7 +112,7 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple @SetFromFlag(value="uniqueTag") protected String uniqueTag; - private Map<String, HighlightTuple> highlights = new HashMap<>(); + private Map<String, HighlightTuple> highlights = new ConcurrentHashMap<>(); /** Name of a highlight that indicates the last action taken by this adjunct. */ public static String HIGHLIGHT_NAME_LAST_ACTION = "lastAction"; diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index e59e9ed..29984e2 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.internal.BrooklynLoggingCategories; import org.apache.brooklyn.api.mgmt.ExecutionManager; @@ -58,6 +59,8 @@ import org.apache.brooklyn.core.BrooklynLogging; import org.apache.brooklyn.core.config.Sanitizer; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedStream; import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; @@ -368,6 +371,13 @@ public class BasicExecutionManager implements ExecutionManager { log.warn("Deleting submitted task before completion: "+removed+"; this task will continue to run in the background outwith "+this+", but perhaps it should have been cancelled?"); } } + task.getTags().forEach(t -> { + // remove tags which might have references to entities etc (help out garbage collector) + if (t instanceof TaskInternal) { + Set<Object> tagsM = ((TaskInternal) t).getMutableTags(); + tagsM.removeAll(tagsM.stream().filter(tag -> tag instanceof WrappedStream || tag instanceof WrappedEntity).collect(Collectors.toList())); + } + }); return removed != null; } diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java index f3bd7fc..f1e7a41 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java @@ -19,6 +19,7 @@ package org.apache.brooklyn.core.mgmt.ha; import java.util.Collection; +import org.apache.brooklyn.api.sensor.AttributeSensor; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -232,13 +233,10 @@ public class HotStandbyTest { assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app"); assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3); - try { - ((TestApplication)appRO).sensors().set(TestEntity.SEQUENCE, 4); - Assert.fail("Should not have allowed sensor to be set"); - } catch (Exception e) { - Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e); - } - assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3); + setSensorOnReadOnly(appRO, TestEntity.SEQUENCE, 4); + + // it will revert to 3 + EntityAsserts.assertAttributeEqualsEventually(appRO, TestEntity.SEQUENCE, (Integer)3); } @Test @@ -410,11 +408,13 @@ public class HotStandbyTest { // Jenkins: java.lang.AssertionError: Too much memory used - 158m > max 154m // Svet local: java.lang.AssertionError: Too much memory used - 16m > max 13m // The test is not deterministic so marking as "Manual", i.e. probably shouldn't be included in automated tests. + // But it is a very useful check that hot standby is stable! @Test(groups={"Integration", "Broken", "Manual"}) public void testHotStandbyDoesNotLeakBigObjects() throws Exception { log.info("Starting test "+JavaClassNames.niceClassAndMethod()); final int SIZE = 5; - final int SIZE_UP_BOUND = SIZE+2; + final int SIZE_UP_BOUND1 = SIZE+2; + final int SIZE_UP_BOUND2 = SIZE_UP_BOUND1; final int SIZE_DOWN_BOUND = SIZE-1; final int GRACE = 2; // the XML persistence uses a lot of space, we approx at between 2x and 3c @@ -426,16 +426,18 @@ public class HotStandbyTest { noteUsedMemory("Finished seeding"); Long initialUsed = memoryAssertions.peekLastUsedMemory(); app.config().set(TestEntity.CONF_OBJECT, new BigObject(SIZE*1000*1000)); - assertUsedMemoryMaxDelta("Set a big config object", SIZE_UP_BOUND); + assertUsedMemoryMaxDelta("Set a big config object", SIZE_UP_BOUND1); forcePersistNow(n1); assertUsedMemoryMaxDelta("Persisted a big config object", SIZE_IN_XML); HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); forceRebindNow(n2); - assertUsedMemoryMaxDelta("Rebinded", SIZE_UP_BOUND); + assertUsedMemoryMaxDelta("Rebinded", SIZE_UP_BOUND2); - for (int i=0; i<10; i++) + for (int i=0; i<10; i++) { forceRebindNow(n2); + } + assertUsedMemoryMaxDelta("Several more rebinds", GRACE); for (int i=0; i<10; i++) { forcePersistNow(n1); @@ -446,10 +448,10 @@ public class HotStandbyTest { app.config().set(TestEntity.CONF_OBJECT, "big is now small"); assertUsedMemoryMaxDelta("Big made small at primary", -SIZE_DOWN_BOUND); forcePersistNow(n1); - assertUsedMemoryMaxDelta("And persisted", -SIZE_IN_XML_DOWN); + assertUsedMemoryMaxDelta("And persisted", 0); //-SIZE_IN_XML_DOWN); forceRebindNow(n2); - assertUsedMemoryMaxDelta("And at secondary", -SIZE_DOWN_BOUND); + assertUsedMemoryMaxDelta("And at secondary", 0); //-SIZE_DOWN_BOUND); Entities.unmanage(app); forcePersistNow(n1); @@ -543,12 +545,7 @@ public class HotStandbyTest { }); Application app2RO = n1.mgmt.lookup(app2.getId(), Application.class); - try { - ((TestApplication)app2RO).sensors().set(TestEntity.SEQUENCE, 4); - Assert.fail("Should not have allowed sensor to be set"); - } catch (Exception e) { - Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e); - } + setSensorOnReadOnly(app2RO, TestEntity.SEQUENCE, 4); n1.ha.changeMode(HighAvailabilityMode.AUTO); n2.ha.changeMode(HighAvailabilityMode.HOT_STANDBY, true, false); @@ -574,6 +571,17 @@ public class HotStandbyTest { EntityAsserts.assertAttributeEquals(app2BRO, TestEntity.SEQUENCE, 4); } + private <T> void setSensorOnReadOnly(Entity item, AttributeSensor<T> sensor, T value) { + try { + item.sensors().set(sensor, value); + + // we now allow access to the sensors -- but they don't have effect and a warning is logged + // Assert.fail("Should not have allowed sensor to be set"); + } catch (Exception e) { + // Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e); + } + } + @Test(groups="Integration", invocationCount=20) public void testChangeModeManyTimes() throws Exception { testChangeMode(); diff --git a/policy/src/main/java/org/apache/brooklyn/policy/failover/ElectPrimaryPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/failover/ElectPrimaryPolicy.java index a0187f7..ac9531d 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/failover/ElectPrimaryPolicy.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/failover/ElectPrimaryPolicy.java @@ -112,8 +112,8 @@ public class ElectPrimaryPolicy extends AbstractPolicy implements ElectPrimaryCo public void setEntity(@SuppressWarnings("deprecation") org.apache.brooklyn.api.entity.EntityLocal entity) { super.setEntity(entity); - if (!Entities.isManagedActive(entity)) { - // hot standby or entity finished + if (Entities.isReadOnly(entity)) { + // don't run in hot standby return; } diff --git a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java index 6d38525..002e3c6 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java +++ b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java @@ -1531,6 +1531,10 @@ public class Asserts { } public void assertUsedMemoryLessThan(String event, long max, boolean push) { + assertUsedMemoryLessThan(event, max, push, null); + } + + public void assertUsedMemoryLessThan(String event, long max, boolean push, String extraFailMessage) { long nowUsed = pushUsedMemory(event); if (nowUsed > max) { // aggressively try to force GC @@ -1538,14 +1542,15 @@ public class Asserts { popUsedMemory(); nowUsed = pushUsedMemory(event+" (extra GC)"); if (nowUsed > max) { - fail("Too much memory used - "+ ByteSizeStrings.java().apply(nowUsed)+" > max "+ByteSizeStrings.java().apply(max)); + fail("Too much memory used - "+ ByteSizeStrings.java().apply(nowUsed)+" > max "+ByteSizeStrings.java().apply(max)+ + (extraFailMessage==null ? "" : " "+extraFailMessage)); } } if (!push) popUsedMemory(); } public void assertUsedMemoryMaxDelta(String event, long deltaMegabytes, boolean push) { final long last = peekLastUsedMemory(); - assertUsedMemoryLessThan(event, last + deltaMegabytes*1024*1024, push); + assertUsedMemoryLessThan(event, last + deltaMegabytes*1024*1024, push, "(prev was "+ByteSizeStrings.java().apply(last)+", delta limit was "+ByteSizeStrings.java().apply(deltaMegabytes*1024*1024)+")"); } } }
