Merge remote-tracking branch 'aledsage/fix/rebind-and-ha-20140606' into reviews
Conflicts: core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java Mainly different-but-the-same logging and some minor interface/refactoring changes Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/458a117c Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/458a117c Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/458a117c Branch: refs/heads/master Commit: 458a117c39d07546afd76250c0ca44a123e43b32 Parents: 9f42565 85dc83e Author: Alex Heneveld <alex.henev...@cloudsoftcorp.com> Authored: Tue Jun 10 01:27:02 2014 +0100 Committer: Alex Heneveld <alex.henev...@cloudsoftcorp.com> Committed: Tue Jun 10 01:27:02 2014 +0100 ---------------------------------------------------------------------- .../brooklyn/entity/rebind/ChangeListener.java | 12 +++ .../brooklyn/entity/rebind/RebindManager.java | 4 + .../mementos/BrooklynMementoPersister.java | 8 +- .../enricher/basic/AbstractEnricher.java | 8 ++ .../brooklyn/enricher/basic/Aggregator.java | 14 +-- .../java/brooklyn/enricher/basic/Combiner.java | 14 +-- .../brooklyn/enricher/basic/Transformer.java | 13 ++- .../brooklyn/entity/basic/AbstractEntity.java | 18 ++-- .../entity/basic/EntityDynamicType.java | 7 +- .../entity/rebind/BasicEntityRebindSupport.java | 1 + .../rebind/ImmediateDeltaChangeListener.java | 36 ++++++-- .../rebind/PeriodicDeltaChangeListener.java | 38 ++++++-- .../rebind/RebindContextLookupContext.java | 21 +---- .../entity/rebind/RebindManagerImpl.java | 87 +++++++++++++++++- .../entity/rebind/dto/MementosGenerators.java | 20 ++++- .../BrooklynMementoPersisterInMemory.java | 75 ++++++++++------ .../rebind/persister/XmlMementoSerializer.java | 29 +++--- .../event/feed/AttributePollHandler.java | 13 ++- .../location/basic/AbstractLocation.java | 6 +- .../ha/HighAvailabilityManagerImpl.java | 9 +- .../internal/EntityChangeListener.java | 18 ++-- .../internal/EntityManagementSupport.java | 18 +++- .../policy/basic/AbstractEntityAdjunct.java | 11 ++- .../brooklyn/policy/basic/AbstractPolicy.java | 8 ++ .../java/brooklyn/enricher/EnrichersTest.java | 93 ++++++++++++++++++-- .../entity/rebind/RebindEnricherTest.java | 44 ++++++++- .../entity/rebind/RebindEntityTest.java | 60 +++++++++++-- .../entity/rebind/RebindLocationTest.java | 40 +++++++++ .../entity/rebind/RebindManagerTest.java | 45 ++++++++++ .../entity/rebind/RebindPolicyTest.java | 88 +++++++++++++++++- .../persister/XmlMementoSerializerTest.java | 32 ++----- .../ha/HighAvailabilityManagerTestFixture.java | 18 ++++ .../brooklyn/location/jclouds/JcloudsUtil.java | 5 +- .../jclouds/RebindJcloudsLocationTest.java | 48 ++++++++++ .../entity/webapp/DynamicWebAppClusterImpl.java | 12 ++- .../brooklyn/launcher/BrooklynLauncher.java | 4 +- 36 files changed, 800 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/entity/rebind/RebindContextLookupContext.java ---------------------------------------------------------------------- diff --cc core/src/main/java/brooklyn/entity/rebind/RebindContextLookupContext.java index 1c021c7,0000000..392ea06 mode 100644,000000..100644 --- a/core/src/main/java/brooklyn/entity/rebind/RebindContextLookupContext.java +++ b/core/src/main/java/brooklyn/entity/rebind/RebindContextLookupContext.java @@@ -1,63 -1,0 +1,50 @@@ +package brooklyn.entity.rebind; + - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - +import brooklyn.entity.Entity; +import brooklyn.location.Location; +import brooklyn.mementos.BrooklynMementoPersister.LookupContext; +import brooklyn.policy.Enricher; +import brooklyn.policy.Policy; + +public class RebindContextLookupContext implements LookupContext { + - private static final Logger LOG = LoggerFactory.getLogger(RebindContextLookupContext.class); - + protected final RebindContext rebindContext; + protected final RebindExceptionHandler exceptionHandler; + + public RebindContextLookupContext(RebindContext rebindContext, RebindExceptionHandler exceptionHandler) { + this.rebindContext = rebindContext; + this.exceptionHandler = exceptionHandler; + } + - @Override public Entity lookupEntity(Class<?> type, String id) { ++ @Override public Entity lookupEntity(String id) { + Entity result = rebindContext.getEntity(id); + if (result == null) { + result = exceptionHandler.onDanglingEntityRef(id); - } else if (type != null && !type.isInstance(result)) { - LOG.warn("Entity with id "+id+" does not match type "+type+"; returning "+result); + } + return result; + } + - @Override public Location lookupLocation(Class<?> type, String id) { ++ @Override public Location lookupLocation(String id) { + Location result = rebindContext.getLocation(id); + if (result == null) { + result = exceptionHandler.onDanglingLocationRef(id); - } else if (type != null && !type.isInstance(result)) { - LOG.warn("Location with id "+id+" does not match type "+type+"; returning "+result); + } + return result; + } + - @Override public Policy lookupPolicy(Class<?> type, String id) { ++ @Override public Policy lookupPolicy(String id) { + Policy result = rebindContext.getPolicy(id); + if (result == null) { + result = exceptionHandler.onDanglingPolicyRef(id); - } else if (type != null && !type.isInstance(result)) { - LOG.warn("Policy with id "+id+" does not match type "+type+"; returning "+result); + } + return result; + } + - @Override public Enricher lookupEnricher(Class<?> type, String id) { ++ @Override public Enricher lookupEnricher(String id) { + Enricher result = rebindContext.getEnricher(id); + if (result == null) { + result = exceptionHandler.onDanglingEnricherRef(id); - } else if (type != null && !type.isInstance(result)) { - LOG.warn("Enricher with id "+id+" does not match type "+type+"; returning "+result); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java ---------------------------------------------------------------------- diff --cc core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java index 658b1c1,c2dc932..c10a17d --- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java +++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java @@@ -45,7 -47,7 +47,8 @@@ import brooklyn.util.collections.Mutabl import brooklyn.util.exceptions.Exceptions; import brooklyn.util.flags.FlagUtils; import brooklyn.util.javalang.Reflections; + import brooklyn.util.task.BasicExecutionContext; +import brooklyn.util.text.Strings; import brooklyn.util.time.Duration; import com.google.common.annotations.VisibleForTesting; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterInMemory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java ---------------------------------------------------------------------- diff --cc core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java index a3163f4,67c210d..14551d2 --- a/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java +++ b/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java @@@ -522,11 -516,27 +516,27 @@@ public class RebindEntityTest extends R } @Test + public void testRebindPersistsDynamicAttribute() throws Exception { + final String sensorName = "test.mydynamicsensor"; + final String sensorDescription = "My description"; + final AttributeSensor<String> MY_DYNAMIC_SENSOR = new BasicAttributeSensor<String>( + String.class, sensorName, sensorDescription); + + origApp.setAttribute(MY_DYNAMIC_SENSOR, "myval"); + assertEquals(origApp.getEntityType().getSensor(sensorName).getDescription(), sensorDescription); + + newApp = rebind(); + + assertEquals(newApp.getAttribute(MY_DYNAMIC_SENSOR), "myval"); + assertEquals(newApp.getEntityType().getSensor(sensorName).getDescription(), sensorDescription); + } + + @Test public void testRebindWhenPreviousAppDestroyedHasNoApp() throws Exception { origApp.stop(); - + RebindTestUtils.waitForPersisted(origManagementContext); - LocalManagementContext newManagementContext = RebindTestUtils.newPersistingManagementContextUnstarted(mementoDir, classLoader); + newManagementContext = RebindTestUtils.newPersistingManagementContextUnstarted(mementoDir, classLoader); List<Application> newApps = newManagementContext.getRebindManager().rebind(classLoader); newManagementContext.getRebindManager().start(); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java ---------------------------------------------------------------------- diff --cc core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java index 9f7601a,0000000..959c0ba mode 100644,000000..100644 --- a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java +++ b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java @@@ -1,214 -1,0 +1,232 @@@ +package brooklyn.management.ha; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; ++import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.BrooklynVersion; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore; +import brooklyn.entity.rebind.persister.PersistMode; +import brooklyn.entity.rebind.persister.PersistenceObjectStore; +import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord; +import brooklyn.management.ha.HighAvailabilityManagerImpl.PromotionListener; +import brooklyn.management.internal.ManagementContextInternal; +import brooklyn.test.Asserts; +import brooklyn.test.entity.LocalManagementContextForTests; +import brooklyn.util.time.Duration; + +import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +@Test +public abstract class HighAvailabilityManagerTestFixture { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerTestFixture.class); + + private ManagementPlaneSyncRecordPersister persister; + private ManagementContextInternal managementContext; + private String ownNodeId; + private HighAvailabilityManagerImpl manager; + private Ticker ticker; + private AtomicLong currentTime; // used to set the ticker's return value + private RecordingPromotionListener promotionListener; + private ClassLoader classLoader = getClass().getClassLoader(); + private PersistenceObjectStore objectStore; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + currentTime = new AtomicLong(System.currentTimeMillis()); + ticker = new Ticker() { + // strictly not a ticker because returns millis UTC, but it works fine even so + @Override public long read() { + return currentTime.get(); + } + }; + promotionListener = new RecordingPromotionListener(); + managementContext = newLocalManagementContext(); + ownNodeId = managementContext.getManagementNodeId(); + objectStore = newPersistenceObjectStore(); + objectStore.prepareForUse(managementContext, PersistMode.CLEAN); + persister = new ManagementPlaneSyncRecordPersisterToObjectStore(managementContext, objectStore, classLoader); + BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader); + managementContext.getRebindManager().setPersister(persisterObj); + manager = new HighAvailabilityManagerImpl(managementContext) + .setPollPeriod(Duration.millis(10)) + .setHeartbeatTimeout(Duration.THIRTY_SECONDS) + .setPromotionListener(promotionListener) + .setTicker(ticker) + .setPersister(persister); + } + + protected ManagementContextInternal newLocalManagementContext() { + return new LocalManagementContextForTests(); + } + + protected abstract PersistenceObjectStore newPersistenceObjectStore(); + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (manager != null) manager.stop(); + if (managementContext != null) Entities.destroyAll(managementContext); + if (objectStore != null) objectStore.deleteCompletely(); + } + ++ // The web-console could still be polling (e.g. if have just restarted brooklyn), before the persister is set. ++ // Must not throw NPE, but instead return something sensible (e.g. an empty state record). ++ @Test ++ public void testGetManagementPlaneSyncStateDoesNotThrowNpeBeforePersisterSet() throws Exception { ++ HighAvailabilityManagerImpl manager2 = new HighAvailabilityManagerImpl(managementContext) ++ .setPollPeriod(Duration.millis(10)) ++ .setHeartbeatTimeout(Duration.THIRTY_SECONDS) ++ .setPromotionListener(promotionListener) ++ .setTicker(ticker); ++ try { ++ ManagementPlaneSyncRecord state = manager2.getManagementPlaneSyncState(); ++ assertNotNull(state); ++ } finally { ++ manager2.stop(); ++ } ++ ++ } + // Can get a log.error about our management node's heartbeat being out of date. Caused by + // poller first writing a heartbeat record, and then the clock being incremented. But the + // next poll fixes it. + public void testPromotes() throws Exception { + persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() + .node(newManagerMemento(ownNodeId, ManagementNodeState.STANDBY, tickerCurrentMillis())) + .node(newManagerMemento("node1", ManagementNodeState.MASTER, tickerCurrentMillis())) + .setMaster("node1") + .build()); + + manager.start(HighAvailabilityMode.AUTO); + + // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish + // its own heartbeat with the new time; but node1's record is now out-of-date. + tickerAdvance(Duration.seconds(31)); + + // Expect to be notified of our promotion, as the only other node + promotionListener.assertCalledEventually(); + } + + @Test(groups="Integration") // because one second wait in succeedsContinually + public void testDoesNotPromoteIfMasterTimeoutNotExpired() throws Exception { + persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() + .node(newManagerMemento(ownNodeId, ManagementNodeState.STANDBY, tickerCurrentMillis())) + .node(newManagerMemento("node1", ManagementNodeState.MASTER, tickerCurrentMillis())) + .setMaster("node1") + .build()); + + manager.start(HighAvailabilityMode.AUTO); + + tickerAdvance(Duration.seconds(29)); + + // Expect not to be notified, as 29s < 30s timeout (it's a fake clock so won't hit 30, even waiting 1s below) + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + assertTrue(promotionListener.callTimestamps.isEmpty(), "calls="+promotionListener.callTimestamps); + }}); + } + + public void testGetManagementPlaneStatus() throws Exception { + // with the name zzzzz the mgr created here should never be promoted by the alphabetical strategy! + + tickerAdvance(Duration.FIVE_SECONDS); + persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() + .node(newManagerMemento("zzzzzzz_node1", ManagementNodeState.STANDBY, tickerCurrentMillis())) + .build()); + long zzzTime = tickerCurrentMillis(); + tickerAdvance(Duration.FIVE_SECONDS); + + manager.start(HighAvailabilityMode.AUTO); + ManagementPlaneSyncRecord memento = manager.getManagementPlaneSyncState(); + + // Note can assert timestamp because not "real" time; it's using our own Ticker + assertEquals(memento.getMasterNodeId(), ownNodeId); + assertEquals(memento.getManagementNodes().keySet(), ImmutableSet.of(ownNodeId, "zzzzzzz_node1")); + assertEquals(memento.getManagementNodes().get(ownNodeId).getNodeId(), ownNodeId); + assertEquals(memento.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.MASTER); + assertEquals(memento.getManagementNodes().get(ownNodeId).getTimestampUtc(), tickerCurrentMillis()); + assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getNodeId(), "zzzzzzz_node1"); + assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getStatus(), ManagementNodeState.STANDBY); + assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getTimestampUtc(), zzzTime); + } + + @Test(groups="Integration", invocationCount=50) //because we have had non-deterministic failures + public void testGetManagementPlaneStatusManyTimes() throws Exception { + testGetManagementPlaneStatus(); + } + + @Test + public void testGetManagementPlaneSyncStateInfersTimedOutNodeAsFailed() throws Exception { + persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() + .node(newManagerMemento(ownNodeId, ManagementNodeState.STANDBY, tickerCurrentMillis())) + .node(newManagerMemento("node1", ManagementNodeState.MASTER, tickerCurrentMillis())) + .setMaster("node1") + .build()); + + manager.start(HighAvailabilityMode.AUTO); + + ManagementPlaneSyncRecord state = manager.getManagementPlaneSyncState(); + assertEquals(state.getManagementNodes().get("node1").getStatus(), ManagementNodeState.MASTER); + assertEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.STANDBY); + + // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish + // its own heartbeat with the new time; but node1's record is now out-of-date. + tickerAdvance(Duration.seconds(31)); + + ManagementPlaneSyncRecord state2 = manager.getManagementPlaneSyncState(); + assertEquals(state2.getManagementNodes().get("node1").getStatus(), ManagementNodeState.FAILED); + assertNotEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.FAILED); + } + + private long tickerCurrentMillis() { + return ticker.read(); + } + + private long tickerAdvance(Duration duration) { + currentTime.addAndGet(duration.toMilliseconds()); + return tickerCurrentMillis(); + } + + private ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status, long timestamp) { + return BasicManagementNodeSyncRecord.builder().brooklynVersion(BrooklynVersion.get()).nodeId(nodeId).status(status).timestampUtc(timestamp).build(); + } + + public static class RecordingPromotionListener implements PromotionListener { + public final List<Long> callTimestamps = Lists.newCopyOnWriteArrayList(); + + @Override + public void promotingToMaster() { + callTimestamps.add(System.currentTimeMillis()); + } + + public void assertNotCalled() { + assertTrue(callTimestamps.isEmpty(), "calls="+callTimestamps); + } + + public void assertCalled() { + assertFalse(callTimestamps.isEmpty(), "calls="+callTimestamps); + } + + public void assertCalledEventually() { + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertCalled(); + }}); + } + }; +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java ----------------------------------------------------------------------