Repository: brooklyn-server Updated Branches: refs/heads/master ef20daaac -> 1dcd797ab
Adds ManagementNodeStateListener support Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/1883380a Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/1883380a Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/1883380a Branch: refs/heads/master Commit: 1883380a87c793cb006586698768449b6702e0cd Parents: 854c351 Author: Aled Sage <aled.s...@gmail.com> Authored: Tue Sep 19 15:21:58 2017 +0100 Committer: Aled Sage <aled.s...@gmail.com> Committed: Thu Sep 21 09:09:34 2017 +0100 ---------------------------------------------------------------------- .../mgmt/ha/HighAvailabilityManagerImpl.java | 45 +++-- .../internal/AbstractManagementContext.java | 5 +- .../ManagementNodeStateListenerManager.java | 175 +++++++++++++++++++ .../mgmt/usage/ManagementNodeStateListener.java | 53 ++++++ .../core/server/BrooklynServerConfig.java | 20 +++ .../ha/HighAvailabilityManagerInMemoryTest.java | 5 - .../ha/HighAvailabilityManagerTestFixture.java | 61 ++++++- .../usage/ManagementNodeStateListenerTest.java | 160 +++++++++++++++++ ...ailabilityManagerJcloudsObjectStoreTest.java | 7 +- 9 files changed, 502 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1883380a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java index a4f8870..4d5a2ee 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java @@ -39,8 +39,8 @@ import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; import org.apache.brooklyn.api.mgmt.ha.ManagementNodeSyncRecord; import org.apache.brooklyn.api.mgmt.ha.ManagementPlaneSyncRecord; import org.apache.brooklyn.api.mgmt.ha.ManagementPlaneSyncRecordPersister; -import org.apache.brooklyn.api.mgmt.ha.MementoCopyMode; import org.apache.brooklyn.api.mgmt.ha.ManagementPlaneSyncRecordPersister.Delta; +import org.apache.brooklyn.api.mgmt.ha.MementoCopyMode; import org.apache.brooklyn.api.mgmt.rebind.RebindManager; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.BrooklynFeatureEnablement; @@ -61,9 +61,10 @@ import org.apache.brooklyn.core.mgmt.internal.LocationManagerInternal; import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; import org.apache.brooklyn.core.mgmt.internal.ManagementTransitionMode; import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils; -import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics; import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils.CreateBackupMode; +import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics; import org.apache.brooklyn.core.mgmt.rebind.RebindManagerImpl; +import org.apache.brooklyn.core.mgmt.usage.ManagementNodeStateListener; import org.apache.brooklyn.core.server.BrooklynServerConfig; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; @@ -129,6 +130,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityManagerImpl.class); private final ManagementContextInternal managementContext; + private final ManagementNodeStateListener stateListener; private final String ownNodeId; private volatile ManagementPlaneSyncRecordPersister persister; private volatile PromotionListener promotionListener; @@ -161,8 +163,15 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { private volatile PersistenceActivityMetrics managementStateReadPersistenceMetrics = new PersistenceActivityMetrics(); private final long startTimeUtc; - public HighAvailabilityManagerImpl(ManagementContextInternal managementContext) { + /** + * + * @param managementContext + * @param stateListener The listener to be notified when the (publicised) {@link ManagementNodeState} + * changes, as would be advertised by {@link #getNodeState()} + */ + public HighAvailabilityManagerImpl(ManagementContextInternal managementContext, ManagementNodeStateListener stateListener) { this.managementContext = managementContext; + this.stateListener = stateListener; ownNodeId = managementContext.getManagementNodeId(); startTimeUtc = localTickerUtc.read(); } @@ -244,13 +253,14 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { public void disabled() { disabled = true; // this is notionally the master, just not running; see javadoc for more info + setNodeStateTransitionComplete(true); stop(ManagementNodeState.MASTER); } @Override public void start(HighAvailabilityMode startMode) { - nodeStateTransitionComplete = true; + setNodeStateTransitionComplete(true); disabled = false; running = true; changeMode(startMode, true, true); @@ -420,7 +430,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { if ((startMode==HighAvailabilityMode.HOT_STANDBY || startMode==HighAvailabilityMode.HOT_BACKUP)) { if (!ManagementNodeState.isHotProxy(oldState)) { // now transition to hot proxy - nodeStateTransitionComplete = false; + setNodeStateTransitionComplete(false); if (startMode==HighAvailabilityMode.HOT_STANDBY) { // if it should be hot standby, then we may need to promote // inform the world that we are transitioning (but not eligible for promotion while going in to hot standby) @@ -430,7 +440,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { try { activateHotProxy(ManagementNodeState.of(startMode).get()).get(); // error above now throws - nodeStateTransitionComplete = true; + setNodeStateTransitionComplete(true); publishHealth(); if (getNodeState()==ManagementNodeState.HOT_STANDBY || getNodeState()==ManagementNodeState.HOT_BACKUP) { @@ -443,17 +453,17 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { } } catch (Exception e) { LOG.warn("Management node "+ownNodeId+" in "+managementContext.getManagementPlaneIdMaybe().or("<new-plane>")+" unable to promote to "+startMode+" (currently "+getNodeState()+"); rethrowing: "+Exceptions.collapseText(e)); - nodeStateTransitionComplete = true; + setNodeStateTransitionComplete(true); throw Exceptions.propagate(e); } } else { // transitioning among hot proxy states - tell the rebind manager managementContext.getRebindManager().stopReadOnly(); managementContext.getRebindManager().startReadOnly(ManagementNodeState.of(startMode).get()); - nodeStateTransitionComplete = true; + setNodeStateTransitionComplete(true); } } else { - nodeStateTransitionComplete = true; + setNodeStateTransitionComplete(true); } if (startMode!=HighAvailabilityMode.DISABLED) registerPollTask(); @@ -528,8 +538,17 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { managementContext.getRebindManager().stopReadOnly(); clearManagedItems(ManagementTransitionMode.transitioning(BrooklynObjectManagementMode.LOADED_READ_ONLY, BrooklynObjectManagementMode.UNMANAGED_PERSISTED)); } + + stateListener.onStateChange(getNodeState()); } + private void setNodeStateTransitionComplete(boolean val) { + nodeStateTransitionComplete = val; + + // Can cause getNodeState() value to change, so notify listener + stateListener.onStateChange(getNodeState()); + } + @Override public ManagementNodeState getNodeState() { ManagementNodeState myNodeState = getInternalNodeState(); @@ -879,7 +898,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { wasMaster ? BrooklynObjectManagementMode.MANAGED_PRIMARY : BrooklynObjectManagementMode.LOADED_READ_ONLY, BrooklynObjectManagementMode.UNMANAGED_PERSISTED); - nodeStateTransitionComplete = false; + setNodeStateTransitionComplete(false); switch (toState) { case FAILED: @@ -892,15 +911,15 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { throw new IllegalStateException("Illegal target state: "+toState); } onDemotionStopItems(mode); - nodeStateTransitionComplete = true; + setNodeStateTransitionComplete(true); publishDemotion(wasMaster); if (toState==ManagementNodeState.HOT_BACKUP || toState==ManagementNodeState.HOT_STANDBY) { - nodeStateTransitionComplete = false; + setNodeStateTransitionComplete(false); try { activateHotProxy(toState).get(); } finally { - nodeStateTransitionComplete = true; + setNodeStateTransitionComplete(true); } publishHealth(); } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1883380a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java index 322d60e..3164263 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java @@ -143,6 +143,7 @@ public abstract class AbstractManagementContext implements ManagementContextInte protected ClassLoader baseClassLoader; protected Iterable<URL> baseClassPathForScanning; + private final ManagementNodeStateListenerManager managementNodeStateListenerManager; private final RebindManager rebindManager; private final HighAvailabilityManager highAvailabilityManager; @@ -177,7 +178,8 @@ public abstract class AbstractManagementContext implements ManagementContextInte this.storage = new BrooklynStorageImpl(); this.rebindManager = new RebindManagerImpl(this); // TODO leaking "this" reference; yuck - this.highAvailabilityManager = new HighAvailabilityManagerImpl(this); // TODO leaking "this" reference; yuck + this.managementNodeStateListenerManager = new ManagementNodeStateListenerManager(this); // TODO leaking "this" reference; yuck + this.highAvailabilityManager = new HighAvailabilityManagerImpl(this, managementNodeStateListenerManager); // TODO leaking "this" reference; yuck this.entitlementManager = Entitlements.newManager(this, brooklynProperties); this.configSupplierRegistry = new BasicExternalConfigSupplierRegistry(this); // TODO leaking "this" reference; yuck @@ -188,6 +190,7 @@ public abstract class AbstractManagementContext implements ManagementContextInte highAvailabilityManager.stop(); running = false; rebindManager.stop(); + managementNodeStateListenerManager.terminate(); storage.terminate(); // Don't unmanage everything; different entities get given their events at different times // so can cause problems (e.g. a group finds out that a member is unmanaged, before the http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1883380a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementNodeStateListenerManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementNodeStateListenerManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementNodeStateListenerManager.java new file mode 100644 index 0000000..e148c7b --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementNodeStateListenerManager.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.internal; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; +import org.apache.brooklyn.core.mgmt.ManagementContextInjectable; +import org.apache.brooklyn.core.mgmt.usage.ManagementNodeStateListener; +import org.apache.brooklyn.core.server.BrooklynServerConfig; +import org.apache.brooklyn.util.core.ClassLoaderUtils; +import org.apache.brooklyn.util.core.flags.TypeCoercions; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Handles the notification of {@link ManagementNodeStateListener}s. + * + * @see {@link BrooklynServerConfig#MANAGEMENT_NODE_STATE_LISTENERS} for configuring this. + * @see {@link org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl#HighAvailabilityManagerImpl(ManagementContextInternal, ManagementNodeStateListener)} + * for how we get notified of the state-change. + */ +public class ManagementNodeStateListenerManager implements ManagementNodeStateListener { + + private static final Logger LOG = LoggerFactory.getLogger(ManagementNodeStateListenerManager.class); + + private final ManagementContextInternal mgmt; + + private final Object mutex = new Object(); + + private final List<ManagementNodeStateListener> listeners = Lists.newCopyOnWriteArrayList(); + private ManagementNodeState lastPublishedVal; + + private final AtomicInteger listenerQueueSize = new AtomicInteger(); + + private ListeningExecutorService listenerExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("brooklyn-managementnodestate-listener-%d") + .build())); + + public ManagementNodeStateListenerManager(ManagementContextInternal managementContext) { + this.mgmt = checkNotNull(managementContext, "managementContext"); + + // Register a coercion from String->ManagementNodeStateListener, so that MANAGEMENT_NODE_STATE_LISTENERS defined in brooklyn.cfg + // will be instantiated, given their class names. + TypeCoercions.BrooklynCommonAdaptorTypeCoercions.registerInstanceForClassnameAdapter( + new ClassLoaderUtils(this.getClass(), managementContext), + ManagementNodeStateListener.class); + + // Although changing listeners to Collection<ManagementNodeStateListener> is valid at compile time + // the collection will contain any objects that could not be coerced by the function + // declared above. Generally this means any string declared in brooklyn.properties + // that is not a ManagementNodeStateListener. + Collection<?> rawListeners = managementContext.getBrooklynProperties().getConfig(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS); + if (rawListeners != null) { + for (Object obj : rawListeners) { + if (obj == null) { + throw new NullPointerException("null listener in config " + BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS); + } else if (!(obj instanceof ManagementNodeStateListener)) { + throw new ClassCastException("Configured object is not a "+ManagementNodeStateListener.class.getSimpleName()+". This probably means coercion failed: " + obj); + } else { + ManagementNodeStateListener listener = (ManagementNodeStateListener) obj; + if (listener instanceof ManagementContextInjectable) { + ((ManagementContextInjectable) listener).setManagementContext(managementContext); + } + listeners.add((ManagementNodeStateListener)listener); + } + } + } + } + + @Override + public void onStateChange(ManagementNodeState state) { + // Filtering out duplicates/nulls, schedule the notification of the listeners with this latest value. + synchronized (mutex) { + if (state != null && lastPublishedVal != state) { + LOG.debug("Notifying {} listener(s) of management-node state changed to {}", new Object[] {listeners.size(), state}); + lastPublishedVal = state; + execOnListeners(new Function<ManagementNodeStateListener, Void>() { + @Override + public Void apply(ManagementNodeStateListener listener) { + listener.onStateChange(state); + return null; + } + @Override + public String toString() { + return "stateChange("+state+")"; + }}); + } + } + } + + public void terminate() { + // Wait for the listeners to finish + close the listeners + Duration timeout = mgmt.getBrooklynProperties().getConfig(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENER_TERMINATION_TIMEOUT); + if (listenerQueueSize.get() > 0) { + LOG.info("Management-node-state-listener manager waiting for "+listenerQueueSize+" listener events for up to "+timeout); + } + List<ListenableFuture<?>> futures = Lists.newArrayList(); + for (final ManagementNodeStateListener listener : listeners) { + ListenableFuture<?> future = listenerExecutor.submit(new Runnable() { + @Override + public void run() { + if (listener instanceof Closeable) { + try { + ((Closeable)listener).close(); + } catch (IOException | RuntimeException e) { + LOG.warn("Problem closing management-node-state listener "+listener, e); + Exceptions.propagateIfFatal(e); + } + } + }}); + futures.add(future); + } + try { + Futures.successfulAsList(futures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + LOG.warn("Problem terminiating management-node-state listeners (continuing)", e); + } finally { + listenerExecutor.shutdownNow(); + } + } + + private void execOnListeners(final Function<ManagementNodeStateListener, Void> job) { + for (final ManagementNodeStateListener listener : listeners) { + listenerQueueSize.incrementAndGet(); + listenerExecutor.execute(new Runnable() { + @Override + public void run() { + try { + job.apply(listener); + } catch (RuntimeException e) { + LOG.error("Problem notifying listener "+listener+" of "+job, e); + Exceptions.propagateIfFatal(e); + } finally { + listenerQueueSize.decrementAndGet(); + } + }}); + } + } +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1883380a/core/src/main/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListener.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListener.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListener.java new file mode 100644 index 0000000..6b31036 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListener.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.usage; + +import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; +import org.apache.brooklyn.core.server.BrooklynServerConfig; + +import com.google.common.annotations.Beta; + +/** + * Listener to be notified of this Brooklyn node's state. + * + * See {@link BrooklynServerConfig#MANAGEMENT_NODE_STATE_LISTENERS}. + */ +@Beta +public interface ManagementNodeStateListener { + + /** + * A no-op implementation of {@link ManagementNodeStateListener}, for users to extend. + * + * Users are encouraged to extend this class, which will shield the user + * from the addition of other usage event methods being added. If additional + * methods are added in a future release, a no-op implementation will be + * added to this class. + */ + @Beta + public static class BasicListener implements ManagementNodeStateListener { + @Override + public void onStateChange(ManagementNodeState state) { + } + } + + ManagementNodeStateListener NOOP = new BasicListener(); + + @Beta + void onStateChange(ManagementNodeState state); +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1883380a/core/src/main/java/org/apache/brooklyn/core/server/BrooklynServerConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/server/BrooklynServerConfig.java b/core/src/main/java/org/apache/brooklyn/core/server/BrooklynServerConfig.java index da5922e..59cdf64 100644 --- a/core/src/main/java/org/apache/brooklyn/core/server/BrooklynServerConfig.java +++ b/core/src/main/java/org/apache/brooklyn/core/server/BrooklynServerConfig.java @@ -22,6 +22,7 @@ import static org.apache.brooklyn.core.config.ConfigKeys.newStringConfigKey; import java.io.File; import java.net.URI; +import java.util.List; import java.util.Map; import org.apache.brooklyn.api.mgmt.ManagementContext; @@ -29,11 +30,17 @@ import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.config.StringConfigMap; import org.apache.brooklyn.core.catalog.internal.CatalogInitialization; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.mgmt.usage.ManagementNodeStateListener; +import org.apache.brooklyn.core.mgmt.usage.UsageListener; import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableList; +import com.google.common.reflect.TypeToken; + /** Config keys for the brooklyn server */ public class BrooklynServerConfig { @@ -97,6 +104,19 @@ public class BrooklynServerConfig { + "if null or not set, the legacy beahviour of creating backups where possible (e.g. file system) is currently used; " + "this key is DEPRECATED in favor of promotion and demotion specific flags now defaulting to true"); + @SuppressWarnings("serial") + public static final ConfigKey<List<ManagementNodeStateListener>> MANAGEMENT_NODE_STATE_LISTENERS = ConfigKeys.newConfigKey( + new TypeToken<List<ManagementNodeStateListener>>() {}, + "brooklyn.managementNodeState.listeners", + "Optional list of ManagementNodeStateListener instances", + ImmutableList.<ManagementNodeStateListener>of()); + + public static final ConfigKey<Duration> MANAGEMENT_NODE_STATE_LISTENER_TERMINATION_TIMEOUT = ConfigKeys.newConfigKey( + Duration.class, + "brooklyn.managementNodeState.listeners.timeout", + "Timeout on termination, to wait for queue of management-node-state listener events to be processed", + Duration.TEN_SECONDS); + public static final ConfigKey<String> BROOKLYN_CATALOG_URL = ConfigKeys.newStringConfigKey("brooklyn.catalog.url", "The URL of a custom catalog.bom to load"); http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1883380a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerInMemoryTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerInMemoryTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerInMemoryTest.java index 7308810..ab95c1a 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerInMemoryTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerInMemoryTest.java @@ -53,11 +53,6 @@ public class HighAvailabilityManagerInMemoryTest extends HighAvailabilityManager return new InMemoryObjectStore(); } - @Override - public void testGetManagementPlaneStatus() throws Exception { - super.testGetManagementPlaneStatus(); - } - // extra test that promoteToMaster doesn't interfere with what is managed public void testLocationsStillManagedCorrectlyAfterDoublePromotion() throws NoMachinesAvailableException { HighAvailabilityManagerImpl ha = (HighAvailabilityManagerImpl) managementContext.getHighAvailabilityManager(); http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1883380a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerTestFixture.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerTestFixture.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerTestFixture.java index 506d50b..c4d8a3d 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerTestFixture.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerTestFixture.java @@ -24,6 +24,7 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -34,9 +35,7 @@ import org.apache.brooklyn.api.mgmt.ha.ManagementPlaneSyncRecord; import org.apache.brooklyn.api.mgmt.ha.ManagementPlaneSyncRecordPersister; import org.apache.brooklyn.core.BrooklynVersion; import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl; -import org.apache.brooklyn.core.mgmt.ha.ManagementPlaneSyncRecordDeltaImpl; -import org.apache.brooklyn.core.mgmt.ha.ManagementPlaneSyncRecordPersisterToObjectStore; +import org.apache.brooklyn.core.internal.BrooklynProperties; import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl.PromotionListener; import org.apache.brooklyn.core.mgmt.ha.dto.BasicManagementNodeSyncRecord; import org.apache.brooklyn.core.mgmt.ha.dto.BasicManagementNodeSyncRecord.Builder; @@ -45,6 +44,8 @@ import org.apache.brooklyn.core.mgmt.persist.BrooklynMementoPersisterToObjectSto import org.apache.brooklyn.core.mgmt.persist.PersistMode; import org.apache.brooklyn.core.mgmt.persist.PersistenceObjectStore; import org.apache.brooklyn.core.mgmt.rebind.PersistenceExceptionHandlerImpl; +import org.apache.brooklyn.core.mgmt.usage.ManagementNodeStateListener; +import org.apache.brooklyn.core.server.BrooklynServerConfig; import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests; import org.apache.brooklyn.test.Asserts; import org.apache.brooklyn.util.time.Duration; @@ -55,6 +56,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -70,10 +72,11 @@ public abstract class HighAvailabilityManagerTestFixture { private HighAvailabilityManagerImpl manager; private Ticker ticker; private AtomicLong currentTime; // used to set the ticker's return value + protected RecordingManagementNodeStateListener stateListener; private RecordingPromotionListener promotionListener; private ClassLoader classLoader = getClass().getClassLoader(); private PersistenceObjectStore objectStore; - + @BeforeMethod(alwaysRun=true) public void setUp() throws Exception { currentTime = new AtomicLong(1000000000L); @@ -83,6 +86,7 @@ public abstract class HighAvailabilityManagerTestFixture { return currentTime.get(); } }; + stateListener = new RecordingManagementNodeStateListener(); promotionListener = new RecordingPromotionListener(); managementContext = newLocalManagementContext(); ownNodeId = managementContext.getManagementNodeId(); @@ -110,7 +114,9 @@ public abstract class HighAvailabilityManagerTestFixture { } protected ManagementContextInternal newLocalManagementContext() { - return LocalManagementContextForTests.newInstance(); + BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty(); + brooklynProperties.put(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS, ImmutableList.of(stateListener)); + return LocalManagementContextForTests.newInstance(brooklynProperties); } protected abstract PersistenceObjectStore newPersistenceObjectStore(); @@ -126,7 +132,7 @@ public abstract class HighAvailabilityManagerTestFixture { // Must not throw NPE, but instead return something sensible (e.g. an empty state record). @Test public void testGetManagementPlaneSyncStateDoesNotThrowNpeBeforePersisterSet() throws Exception { - HighAvailabilityManagerImpl manager2 = new HighAvailabilityManagerImpl(managementContext) + HighAvailabilityManagerImpl manager2 = new HighAvailabilityManagerImpl(managementContext, ManagementNodeStateListener.NOOP) .setPollPeriod(Duration.millis(10)) .setHeartbeatTimeout(Duration.THIRTY_SECONDS) .setPromotionListener(promotionListener) @@ -143,6 +149,7 @@ public abstract class HighAvailabilityManagerTestFixture { // Can get a log.error about our management node's heartbeat being out of date. Caused by // poller first writing a heartbeat record, and then the clock being incremented. But the // next poll fixes it. + @Test public void testPromotes() throws Exception { persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY)) @@ -158,6 +165,7 @@ public abstract class HighAvailabilityManagerTestFixture { // Expect to be notified of our promotion, as the only other node promotionListener.assertCalledEventually(); + stateListener.assertCalledEventually(ManagementNodeState.INITIALIZING, ManagementNodeState.STANDBY, ManagementNodeState.MASTER); } @Test(groups="Integration") // because one second wait in succeedsContinually @@ -176,10 +184,12 @@ public abstract class HighAvailabilityManagerTestFixture { // (it's normally a fake clock so won't hit 30, even waiting 1s below - but in "IntegrationTest" subclasses it is real!) Asserts.succeedsContinually(new Runnable() { @Override public void run() { - assertTrue(promotionListener.callTimestamps.isEmpty(), "calls="+promotionListener.callTimestamps); + promotionListener.assertNotCalled(); }}); + stateListener.assertCalled(ManagementNodeState.INITIALIZING, ManagementNodeState.STANDBY); } + @Test public void testGetManagementPlaneStatus() throws Exception { // with the name zzzzz the mgr created here should never be promoted by the alphabetical strategy! @@ -204,6 +214,8 @@ public abstract class HighAvailabilityManagerTestFixture { assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getNodeId(), "zzzzzzz_node1"); assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getStatus(), ManagementNodeState.STANDBY); assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getLocalTimestamp(), zzzTime); + + stateListener.assertCalledEventually(ManagementNodeState.INITIALIZING, ManagementNodeState.STANDBY, ManagementNodeState.MASTER); } @Test(groups="Integration", invocationCount=50) //because we have had non-deterministic failures @@ -224,14 +236,20 @@ public abstract class HighAvailabilityManagerTestFixture { ManagementPlaneSyncRecord state = manager.loadManagementPlaneSyncRecord(true); assertEquals(state.getManagementNodes().get("node1").getStatus(), ManagementNodeState.MASTER); assertEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.HOT_STANDBY); - + + stateListener.assertCalledEventually(ManagementNodeState.INITIALIZING, ManagementNodeState.STANDBY, ManagementNodeState.INITIALIZING, ManagementNodeState.HOT_STANDBY); + // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish // its own heartbeat with the new time; but node1's record is now out-of-date. tickerAdvance(Duration.seconds(31)); - + ManagementPlaneSyncRecord state2 = manager.loadManagementPlaneSyncRecord(true); assertEquals(state2.getManagementNodes().get("node1").getStatus(), ManagementNodeState.FAILED); assertNotEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.FAILED); + + // Expect to be notified of our promotion + promotionListener.assertCalledEventually(); + stateListener.assertCalledEventually(ManagementNodeState.INITIALIZING, ManagementNodeState.STANDBY, ManagementNodeState.INITIALIZING, ManagementNodeState.HOT_STANDBY, ManagementNodeState.MASTER); } protected Duration getPollPeriod() { @@ -283,4 +301,29 @@ public abstract class HighAvailabilityManagerTestFixture { }}); } } + + public static class RecordingManagementNodeStateListener implements ManagementNodeStateListener { + private final List<ManagementNodeState> events = Lists.newCopyOnWriteArrayList(); + + @Override + public void onStateChange(ManagementNodeState state) { + events.add(state); + } + + public void assertCalled(ManagementNodeState... expected) { + String errMsg = "actual="+events+"; expected="+Arrays.toString(expected); + assertEquals(events, ImmutableList.copyOf(expected), errMsg); + } + + public void assertCalledEventually(ManagementNodeState... expected) { + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertCalled(expected); + }}); + } + + public List<ManagementNodeState> getEvents() { + return ImmutableList.copyOf(events); + } + } } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1883380a/core/src/test/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListenerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListenerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListenerTest.java new file mode 100644 index 0000000..0c1e07b --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListenerTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.brooklyn.core.mgmt.usage; + +import static org.apache.brooklyn.api.mgmt.ha.ManagementNodeState.INITIALIZING; +import static org.apache.brooklyn.api.mgmt.ha.ManagementNodeState.MASTER; +import static org.apache.brooklyn.api.mgmt.ha.ManagementNodeState.TERMINATED; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.util.List; + +import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; +import org.apache.brooklyn.core.internal.BrooklynProperties; +import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext; +import org.apache.brooklyn.core.server.BrooklynServerConfig; +import org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport; +import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.test.Asserts; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +public class ManagementNodeStateListenerTest extends BrooklynMgmtUnitTestSupport { + + @BeforeMethod(alwaysRun=true) + @Override + public void setUp() throws Exception { + RecordingStaticManagementNodeStateListener.clearInstances(); + super.setUp(); + } + + @AfterMethod(alwaysRun=true) + @Override + public void tearDown() throws Exception { + super.tearDown(); + RecordingStaticManagementNodeStateListener.clearInstances(); + } + + private LocalManagementContext newManagementContext(BrooklynProperties brooklynProperties) { + // Need to call HighAvailabilityManager explicitly; otherwise it will never publish + // the ManagementNodeState. + LocalManagementContext result = LocalManagementContextForTests.newInstance(brooklynProperties); + result.getHighAvailabilityManager().disabled(); + result.noteStartupComplete(); + return result; + } + + @Test + public void testAddUsageListenerInstance() throws Exception { + BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty(); + brooklynProperties.put(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS, ImmutableList.of(new RecordingStaticManagementNodeStateListener())); + replaceManagementContext(newManagementContext(brooklynProperties)); + + assertEventsEventually(RecordingStaticManagementNodeStateListener.getInstance(), ImmutableList.of(INITIALIZING, MASTER)); + + mgmt.terminate(); + assertEventsEventually(RecordingStaticManagementNodeStateListener.getInstance(), ImmutableList.of(INITIALIZING, MASTER, TERMINATED)); + } + + @Test + public void testAddUsageListenerViaProperties() throws Exception { + BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty(); + brooklynProperties.put(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS, RecordingStaticManagementNodeStateListener.class.getName()); + replaceManagementContext(newManagementContext(brooklynProperties)); + + assertEventsEventually(RecordingStaticManagementNodeStateListener.getInstance(), ImmutableList.of(INITIALIZING, MASTER)); + } + + @Test + public void testAddMultipleUsageListenersViaProperties() throws Exception { + BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty(); + brooklynProperties.put(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS, RecordingStaticManagementNodeStateListener.class.getName() + "," + RecordingStaticManagementNodeStateListener.class.getName()); + replaceManagementContext(newManagementContext(brooklynProperties)); + + final List<RecordingStaticManagementNodeStateListener> listeners = RecordingStaticManagementNodeStateListener.getInstances(); + assertEquals(listeners.size(), 2); + assertTrue(listeners.get(0) instanceof RecordingStaticManagementNodeStateListener, "listeners="+listeners); + assertTrue(listeners.get(1) instanceof RecordingStaticManagementNodeStateListener, "listeners="+listeners); + + assertEventsEventually(listeners.get(0), ImmutableList.of(INITIALIZING, MASTER)); + assertEventsEventually(listeners.get(1), ImmutableList.of(INITIALIZING, MASTER)); + } + + @Test(expectedExceptions = ClassCastException.class) + public void testErrorWhenConfiguredClassIsNotAListener() { + BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty(); + brooklynProperties.put(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS, Integer.class.getName()); + replaceManagementContext(LocalManagementContextForTests.newInstance(brooklynProperties)); + } + + private void assertEventsEventually(RecordingManagementNodeStateListener listener, List<ManagementNodeState> expected) { + Asserts.succeedsEventually(new Runnable() { + public void run() { + List<ManagementNodeState> actual = listener.getEvents(); + String errMsg = "actual="+actual+"; expected="+expected; + assertEquals(actual, expected, errMsg); + }}); + } + + public static class RecordingStaticManagementNodeStateListener extends RecordingManagementNodeStateListener implements ManagementNodeStateListener { + private static final List<RecordingStaticManagementNodeStateListener> STATIC_INSTANCES = Lists.newCopyOnWriteArrayList(); + + public static RecordingStaticManagementNodeStateListener getInstance() { + return Iterables.getOnlyElement(STATIC_INSTANCES); + } + + public static RecordingStaticManagementNodeStateListener getLastInstance() { + return Iterables.getLast(STATIC_INSTANCES); + } + + public static List<RecordingStaticManagementNodeStateListener> getInstances() { + return ImmutableList.copyOf(STATIC_INSTANCES); + } + + public static void clearInstances() { + STATIC_INSTANCES.clear(); + } + + public RecordingStaticManagementNodeStateListener() { + // Bad to leak a ref to this before constructor finished, but we'll live with it because + // it's just test code! + STATIC_INSTANCES.add(this); + } + } + + public static class RecordingManagementNodeStateListener implements ManagementNodeStateListener { + private final List<ManagementNodeState> events = Lists.newCopyOnWriteArrayList(); + + @Override + public void onStateChange(ManagementNodeState state) { + events.add(state); + } + + public List<ManagementNodeState> getEvents() { + return ImmutableList.copyOf(events); + } + } +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1883380a/locations/jclouds/src/test/java/org/apache/brooklyn/core/mgmt/persist/jclouds/HighAvailabilityManagerJcloudsObjectStoreTest.java ---------------------------------------------------------------------- diff --git a/locations/jclouds/src/test/java/org/apache/brooklyn/core/mgmt/persist/jclouds/HighAvailabilityManagerJcloudsObjectStoreTest.java b/locations/jclouds/src/test/java/org/apache/brooklyn/core/mgmt/persist/jclouds/HighAvailabilityManagerJcloudsObjectStoreTest.java index 316a3b3..83a7f95 100644 --- a/locations/jclouds/src/test/java/org/apache/brooklyn/core/mgmt/persist/jclouds/HighAvailabilityManagerJcloudsObjectStoreTest.java +++ b/locations/jclouds/src/test/java/org/apache/brooklyn/core/mgmt/persist/jclouds/HighAvailabilityManagerJcloudsObjectStoreTest.java @@ -23,17 +23,22 @@ import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerTestFixture; import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; import org.apache.brooklyn.core.mgmt.persist.PersistenceObjectStore; import org.apache.brooklyn.core.mgmt.persist.jclouds.JcloudsBlobStoreBasedObjectStore; +import org.apache.brooklyn.core.server.BrooklynServerConfig; import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests; import org.apache.brooklyn.util.text.Identifiers; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableList; + @Test(groups={"Live", "Live-sanity"}) public class HighAvailabilityManagerJcloudsObjectStoreTest extends HighAvailabilityManagerTestFixture { @Override protected ManagementContextInternal newLocalManagementContext() { - return new LocalManagementContextForTests(BrooklynProperties.Factory.newDefault()); + BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newDefault(); + brooklynProperties.put(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS, ImmutableList.of(stateListener)); + return new LocalManagementContextForTests(brooklynProperties); } @Override @BeforeMethod