http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java
new file mode 100644
index 0000000..7e34071
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.management.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+import 
org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+import 
org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import 
org.apache.brooklyn.core.management.ha.TestEntityFailingRebind.RebindException;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
+import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
+import brooklyn.entity.rebind.persister.InMemoryObjectStore;
+import brooklyn.entity.rebind.persister.ListeningObjectStore;
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore;
+import brooklyn.internal.BrooklynFeatureEnablement;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableList;
+
+@Test
+public class HighAvailabilityManagerSplitBrainTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(HighAvailabilityManagerSplitBrainTest.class);
+    
+    private List<HaMgmtNode> nodes = new 
MutableList<HighAvailabilityManagerSplitBrainTest.HaMgmtNode>();
+    Map<String,String> sharedBackingStore = MutableMap.of();
+    Map<String,Date> sharedBackingStoreDates = MutableMap.of();
+    private AtomicLong sharedTime; // used to set the ticker's return value
+    private ClassLoader classLoader = getClass().getClassLoader();
+    
+    public class HaMgmtNode {
+        // TODO share with HotStandbyTest and WarmStandbyTest and a few others 
(minor differences but worth it ultimately)
+
+        private ManagementContextInternal mgmt;
+        private String ownNodeId;
+        private String nodeName;
+        private ListeningObjectStore objectStore;
+        private ManagementPlaneSyncRecordPersister persister;
+        private HighAvailabilityManagerImpl ha;
+        private Ticker ticker;
+        private AtomicLong currentTime; // used to set the ticker's return 
value
+
+        public void setUp() throws Exception {
+            if (sharedTime==null)
+                currentTime = new AtomicLong(System.currentTimeMillis());
+            
+            ticker = new Ticker() {
+                // strictly not a ticker because returns millis UTC, but it 
works fine even so
+                @Override public long read() {
+                    if (sharedTime!=null) return sharedTime.get();
+                    return currentTime.get();
+                }
+            };
+            
+            nodeName = "node "+nodes.size();
+            mgmt = newLocalManagementContext();
+            ownNodeId = mgmt.getManagementNodeId();
+            objectStore = new 
ListeningObjectStore(newPersistenceObjectStore());
+            objectStore.injectManagementContext(mgmt);
+            objectStore.prepareForSharedUse(PersistMode.CLEAN, 
HighAvailabilityMode.DISABLED);
+            persister = new 
ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader);
+            
((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento();
+            BrooklynMementoPersisterToObjectStore persisterObj = new 
BrooklynMementoPersisterToObjectStore(objectStore, 
mgmt.getBrooklynProperties(), classLoader);
+            mgmt.getRebindManager().setPersister(persisterObj, 
PersistenceExceptionHandlerImpl.builder().build());
+            ha = 
((HighAvailabilityManagerImpl)mgmt.getHighAvailabilityManager())
+                .setPollPeriod(Duration.PRACTICALLY_FOREVER)
+                .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+                .setLocalTicker(ticker)
+                .setRemoteTicker(ticker)
+                .setPersister(persister);
+            log.info("Created "+nodeName+" "+ownNodeId);
+        }
+        
+        public void tearDown() throws Exception {
+            if (ha != null) ha.stop();
+            if (mgmt != null) Entities.destroyAll(mgmt);
+            if (objectStore != null) objectStore.deleteCompletely();
+        }
+        
+        private long tickerCurrentMillis() {
+            return ticker.read();
+        }
+        
+        private long tickerAdvance(Duration duration) {
+            if (sharedTime!=null)
+                throw new IllegalStateException("Using shared ticker; cannot 
advance private node clock");
+            currentTime.addAndGet(duration.toMilliseconds());
+            return tickerCurrentMillis();
+        }
+        
+        @Override
+        public String toString() {
+            return nodeName+" "+ownNodeId;
+        }
+    }
+    
+    private Boolean prevThrowOnRebind;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        prevThrowOnRebind = TestEntityFailingRebind.getThrowOnRebind();
+        TestEntityFailingRebind.setThrowOnRebind(true);
+        nodes.clear();
+        sharedBackingStore.clear();
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        try {
+            for (HaMgmtNode n: nodes)
+                n.tearDown();
+        } finally {
+            if (prevThrowOnRebind != null) 
TestEntityFailingRebind.setThrowOnRebind(prevThrowOnRebind);
+        }
+    }
+
+    public HaMgmtNode newNode() throws Exception {
+        HaMgmtNode node = new HaMgmtNode();
+        node.setUp();
+        nodes.add(node);
+        return node;
+    }
+
+    private void sharedTickerAdvance(Duration duration) {
+        if (sharedTime==null) {
+            for (HaMgmtNode n: nodes)
+                n.tickerAdvance(duration);
+        } else {
+            sharedTime.addAndGet(duration.toMilliseconds());
+        }
+    }
+    
+    private long sharedTickerCurrentMillis() {
+        return sharedTime.get();
+    }
+    
+    protected void useSharedTime() {
+        if (!nodes.isEmpty())
+            throw new IllegalStateException("shared time must be set up before 
any nodes created");
+        sharedTime = new AtomicLong(System.currentTimeMillis());
+    }
+
+    protected ManagementContextInternal newLocalManagementContext() {
+        return new LocalManagementContextForTests();
+    }
+
+    protected PersistenceObjectStore newPersistenceObjectStore() {
+        return new InMemoryObjectStore(sharedBackingStore, 
sharedBackingStoreDates);
+    }
+    
+    @Test
+    public void testDoubleRebindFails() throws Exception {
+        useSharedTime();
+        HaMgmtNode n1 = newNode();
+        HaMgmtNode n2 = newNode();
+
+        // first auto should become master
+        n1.ha.start(HighAvailabilityMode.AUTO);
+        n2.ha.start(HighAvailabilityMode.AUTO);
+        assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER);
+
+        TestApplication app = ApplicationBuilder.newManagedApp(
+                
EntitySpec.create(TestApplication.class).impl(TestEntityFailingRebind.class), 
n1.mgmt);
+        app.start(ImmutableList.<Location>of());
+
+        n1.mgmt.getRebindManager().forcePersistNow(false, null);
+
+        //don't publish state for a while (i.e. long store delays, failures)
+        sharedTickerAdvance(Duration.ONE_MINUTE);
+
+        try {
+            n2.ha.publishAndCheck(false);
+            fail("n2 rebind failure expected");
+        } catch (Exception e) {
+            assertNestedRebindException(e);
+        }
+
+        // re-check should re-assert successfully, no rebind expected as he 
was previously master
+        n1.ha.publishAndCheck(false);
+        ManagementPlaneSyncRecord memento;
+        memento = n1.ha.loadManagementPlaneSyncRecord(true);
+        
assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), 
ManagementNodeState.MASTER);
+        
assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), 
ManagementNodeState.FAILED);
+
+        // hot backup permitted by the TestEntityFailingRebind
+        n1.ha.changeMode(HighAvailabilityMode.HOT_BACKUP);
+        memento = n1.ha.loadManagementPlaneSyncRecord(true);
+        
assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), 
ManagementNodeState.HOT_BACKUP);
+        try {
+            n1.ha.changeMode(HighAvailabilityMode.MASTER);
+            fail("n1 rebind failure expected");
+        } catch (Exception e) {
+            assertNestedRebindException(e);
+        }
+
+        memento = n1.ha.loadManagementPlaneSyncRecord(true);
+        
assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), 
ManagementNodeState.FAILED);
+        
assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), 
ManagementNodeState.FAILED);
+    }
+
+    @Test
+    public void testStandbyRebind() throws Exception {
+        useSharedTime();
+        HaMgmtNode n1 = newNode();
+        HaMgmtNode n2 = newNode();
+
+        // first auto should become master
+        n1.ha.start(HighAvailabilityMode.AUTO);
+        n2.ha.start(HighAvailabilityMode.AUTO);
+
+        TestApplication app = ApplicationBuilder.newManagedApp(
+                
EntitySpec.create(TestApplication.class).impl(TestEntityFailingRebind.class), 
n1.mgmt);
+        app.start(ImmutableList.<Location>of());
+
+        n1.mgmt.getRebindManager().forcePersistNow(false, null);
+
+        //don't publish state for a while (i.e. long store delays, failures)
+        sharedTickerAdvance(Duration.ONE_MINUTE);
+
+        try {
+            n2.ha.publishAndCheck(false);
+            fail("n2 rebind failure expected");
+        } catch (Exception e) {
+            assertNestedRebindException(e);
+        }
+
+        TestEntityFailingRebind.setThrowOnRebind(false);
+        n1.ha.publishAndCheck(false);
+
+        ManagementPlaneSyncRecord memento = 
n1.ha.loadManagementPlaneSyncRecord(true);
+        
assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), 
ManagementNodeState.MASTER);
+        
assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), 
ManagementNodeState.FAILED);
+    }
+    
+    private void assertNestedRebindException(Throwable t) {
+        Throwable ptr = t;
+        while (ptr != null) {
+            if (ptr instanceof RebindException) {
+                return;
+            }
+            ptr = ptr.getCause();
+        }
+        Exceptions.propagate(t);
+    }
+    
+    @Test
+    public void testIfNodeStopsBeingAbleToWrite() throws Exception {
+        useSharedTime();
+        log.info("time at start "+sharedTickerCurrentMillis());
+        
+        HaMgmtNode n1 = newNode();
+        HaMgmtNode n2 = newNode();
+        
+        // first auto should become master
+        n1.ha.start(HighAvailabilityMode.AUTO);
+        ManagementPlaneSyncRecord memento1 = 
n1.ha.loadManagementPlaneSyncRecord(true);
+        
+        log.info(n1+" HA: "+memento1);
+        assertEquals(memento1.getMasterNodeId(), n1.ownNodeId);
+        Long time0 = sharedTickerCurrentMillis();
+        
assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
 time0);
+        
assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getStatus(), 
ManagementNodeState.MASTER);
+
+        // second - make explicit hot; that's a strictly more complex case 
than cold standby, so provides pretty good coverage
+        n2.ha.start(HighAvailabilityMode.HOT_STANDBY);
+        ManagementPlaneSyncRecord memento2 = 
n2.ha.loadManagementPlaneSyncRecord(true);
+        
+        log.info(n2+" HA: "+memento2);
+        assertEquals(memento2.getMasterNodeId(), n1.ownNodeId);
+        
assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getStatus(), 
ManagementNodeState.MASTER);
+        
assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getStatus(), 
ManagementNodeState.HOT_STANDBY);
+        
assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
 time0);
+        
assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
 time0);
+        
+        // and no entities at either
+        assertEquals(n1.mgmt.getApplications().size(), 0);
+        assertEquals(n2.mgmt.getApplications().size(), 0);
+
+        // create
+        TestApplication app = 
ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class), 
n1.mgmt);
+        app.start(ImmutableList.<Location>of());
+        app.setAttribute(TestApplication.MY_ATTRIBUTE, "hello");
+        
+        assertEquals(n1.mgmt.getApplications().size(), 1);
+        assertEquals(n2.mgmt.getApplications().size(), 0);
+        log.info("persisting "+n1.ownNodeId);
+        n1.mgmt.getRebindManager().forcePersistNow(false, null);
+        
+        n1.objectStore.setWritesFailSilently(true);
+        log.info(n1+" writes off");
+        sharedTickerAdvance(Duration.ONE_MINUTE);
+        log.info("time now "+sharedTickerCurrentMillis());
+        Long time1 = sharedTickerCurrentMillis();
+        
+        log.info("publish "+n2.ownNodeId);
+        n2.ha.publishAndCheck(false);
+        ManagementPlaneSyncRecord memento2b = 
n2.ha.loadManagementPlaneSyncRecord(true);
+        log.info(n2+" HA now: "+memento2b);
+        
+        // n2 infers n1 as failed 
+        
assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getStatus(), 
ManagementNodeState.FAILED);
+        
assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getStatus(), 
ManagementNodeState.MASTER);
+        assertEquals(memento2b.getMasterNodeId(), n2.ownNodeId);
+        
assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
 time0);
+        
assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
 time1);
+        
+        assertEquals(n1.mgmt.getApplications().size(), 1);
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        
assertEquals(n1.mgmt.getApplications().iterator().next().getAttribute(TestApplication.MY_ATTRIBUTE),
 "hello");
+        
+        n1.objectStore.setWritesFailSilently(false);
+        log.info(n1+" writes on");
+        
+        sharedTickerAdvance(Duration.ONE_SECOND);
+        log.info("time now "+sharedTickerCurrentMillis());
+        Long time2 = sharedTickerCurrentMillis();
+        
+        log.info("publish "+n1.ownNodeId);
+        n1.ha.publishAndCheck(false);
+        ManagementPlaneSyncRecord memento1b = 
n1.ha.loadManagementPlaneSyncRecord(true);
+        log.info(n1+" HA now: "+memento1b);
+        
+        ManagementNodeState expectedStateAfterDemotion = 
BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY)
 ?
+            ManagementNodeState.HOT_STANDBY : ManagementNodeState.STANDBY;
+        
+        // n1 comes back and demotes himself 
+        
assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getStatus(), 
expectedStateAfterDemotion);
+        
assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getStatus(), 
ManagementNodeState.MASTER);
+        assertEquals(memento1b.getMasterNodeId(), n2.ownNodeId);
+        
assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
 time2);
+        
assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
 time1);
+        
+        // n2 now sees itself as master, with n1 in standby again
+        ManagementPlaneSyncRecord memento2c = 
n2.ha.loadManagementPlaneSyncRecord(true);
+        log.info(n2+" HA now: "+memento2c);
+        
assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getStatus(), 
expectedStateAfterDemotion);
+        
assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getStatus(), 
ManagementNodeState.MASTER);
+        assertEquals(memento2c.getMasterNodeId(), n2.ownNodeId);
+        
assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
 time2);
+        
assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
 time2);
+
+        // right number of entities at n2; n1 may or may not depending whether 
hot standby is default
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        assertEquals(n1.mgmt.getApplications().size(), 
BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY)
 ? 1 : 0);
+    }
+    
+    @Test(invocationCount=50, groups="Integration")
+    public void testIfNodeStopsBeingAbleToWriteManyTimes() throws Exception {
+        testIfNodeStopsBeingAbleToWrite();
+    }
+    
+    @Test
+    public void testSimultaneousStartup() throws Exception {
+        doTestConcurrentStartup(5, null);
+    }
+
+    @Test
+    public void testNearSimultaneousStartup() throws Exception {
+        doTestConcurrentStartup(20, Duration.millis(20));
+    }
+
+    @Test(invocationCount=50, groups="Integration")
+    public void testNearSimultaneousStartupManyTimes() throws Exception {
+        doTestConcurrentStartup(20, Duration.millis(20));
+    }
+
+    protected void doTestConcurrentStartup(int size, final Duration 
staggerStart) throws Exception {
+        useSharedTime();
+        
+        List<Thread> spawned = MutableList.of();
+        for (int i=0; i<size; i++) {
+            final HaMgmtNode n = newNode();
+            Thread t = new Thread() { public void run() {
+                if (staggerStart!=null) 
Time.sleep(staggerStart.multiply(Math.random()));
+                n.ha.start(HighAvailabilityMode.AUTO);
+                n.ha.setPollPeriod(Duration.millis(20));
+            } };
+            spawned.add(t);
+            t.start();
+        }
+
+        try {
+            final Stopwatch timer = Stopwatch.createStarted();
+            Asserts.succeedsEventually(new Runnable() {
+                @Override public void run() {
+                    ManagementPlaneSyncRecord memento = 
nodes.get(0).ha.loadManagementPlaneSyncRecord(true);
+                    List<ManagementNodeState> counts = MutableList.of(), 
savedCounts = MutableList.of();
+                    for (HaMgmtNode n: nodes) {
+                        counts.add(n.ha.getNodeState());
+                        ManagementNodeSyncRecord m = 
memento.getManagementNodes().get(n.ownNodeId);
+                        if (m!=null) {
+                            savedCounts.add(m.getStatus());
+                        }
+                    }
+                    log.info("while starting "+nodes.size()+" nodes: "
+                        +Collections.frequency(counts, 
ManagementNodeState.MASTER)+" M + "
+                        +Collections.frequency(counts, 
ManagementNodeState.HOT_STANDBY)+" hot + "
+                        +Collections.frequency(counts, 
ManagementNodeState.STANDBY)+" warm + "
+                        +Collections.frequency(counts, 
ManagementNodeState.INITIALIZING)+" init; "
+                        + memento.getManagementNodes().size()+" saved, "
+                        +Collections.frequency(savedCounts, 
ManagementNodeState.MASTER)+" M + "
+                        +Collections.frequency(savedCounts, 
ManagementNodeState.HOT_STANDBY)+" hot + "
+                        +Collections.frequency(savedCounts, 
ManagementNodeState.STANDBY)+" warm + "
+                        +Collections.frequency(savedCounts, 
ManagementNodeState.INITIALIZING)+" init");
+
+                    if (timer.isRunning() && 
Duration.of(timer).compareTo(Duration.TEN_SECONDS)>0) {
+                        log.warn("we seem to have a problem stabilizing");  
//handy place to set a suspend-VM breakpoint!
+                        timer.stop();
+                    }
+                    assertEquals(Collections.frequency(counts, 
ManagementNodeState.MASTER), 1);
+                    assertEquals(Collections.frequency(counts, 
ManagementNodeState.HOT_STANDBY)+Collections.frequency(counts, 
ManagementNodeState.STANDBY), nodes.size()-1);
+                    assertEquals(Collections.frequency(savedCounts, 
ManagementNodeState.MASTER), 1);
+                    assertEquals(Collections.frequency(savedCounts, 
ManagementNodeState.HOT_STANDBY)+Collections.frequency(savedCounts, 
ManagementNodeState.STANDBY), nodes.size()-1);
+                }});
+        } catch (Throwable t) {
+            log.warn("Failed to stabilize (rethrowing): "+t, t);
+            throw Exceptions.propagate(t);
+        }
+        
+        for (Thread t: spawned)
+            t.join(Duration.THIRTY_SECONDS.toMilliseconds());
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java
 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java
new file mode 100644
index 0000000..5a7f79a
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.brooklyn.api.management.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+import 
org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+import 
org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordDeltaImpl;
+import 
org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import 
org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl.PromotionListener;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.BrooklynVersion;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
+import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore;
+import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord;
+import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord.Builder;
+import brooklyn.test.Asserts;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+@Test
+public abstract class HighAvailabilityManagerTestFixture {
+
+    @SuppressWarnings("unused")
+    private static final Logger log = 
LoggerFactory.getLogger(HighAvailabilityManagerTestFixture.class);
+    
+    private ManagementPlaneSyncRecordPersister persister;
+    protected ManagementContextInternal managementContext;
+    private String ownNodeId;
+    private HighAvailabilityManagerImpl manager;
+    private Ticker ticker;
+    private AtomicLong currentTime; // used to set the ticker's return value
+    private RecordingPromotionListener promotionListener;
+    private ClassLoader classLoader = getClass().getClassLoader();
+    private PersistenceObjectStore objectStore;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        currentTime = new AtomicLong(1000000000L);
+        ticker = new Ticker() {
+            // strictly not a ticker because returns millis UTC, but it works 
fine even so
+            @Override public long read() {
+                return currentTime.get();
+            }
+        };
+        promotionListener = new RecordingPromotionListener();
+        managementContext = newLocalManagementContext();
+        ownNodeId = managementContext.getManagementNodeId();
+        objectStore = newPersistenceObjectStore();
+        objectStore.injectManagementContext(managementContext);
+        objectStore.prepareForSharedUse(PersistMode.CLEAN, 
HighAvailabilityMode.DISABLED);
+        persister = new 
ManagementPlaneSyncRecordPersisterToObjectStore(managementContext, objectStore, 
classLoader);
+        
((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento();
+        BrooklynMementoPersisterToObjectStore persisterObj = new 
BrooklynMementoPersisterToObjectStore(
+                objectStore, 
+                managementContext.getBrooklynProperties(), 
+                classLoader);
+        managementContext.getRebindManager().setPersister(persisterObj, 
PersistenceExceptionHandlerImpl.builder().build());
+        manager = 
((HighAvailabilityManagerImpl)managementContext.getHighAvailabilityManager())
+                .setPollPeriod(getPollPeriod())
+                .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+                .setPromotionListener(promotionListener)
+                .setLocalTicker(ticker)
+                .setRemoteTicker(getRemoteTicker())
+                .setPersister(persister);
+        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+            .node(newManagerMemento(ownNodeId, 
ManagementNodeState.HOT_STANDBY))
+            .build());
+
+    }
+
+    protected ManagementContextInternal newLocalManagementContext() {
+        return LocalManagementContextForTests.newInstance();
+    }
+
+    protected abstract PersistenceObjectStore newPersistenceObjectStore();
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (manager != null) manager.stop();
+        if (managementContext != null) Entities.destroyAll(managementContext);
+        if (objectStore != null) objectStore.deleteCompletely();
+    }
+    
+    // The web-console could still be polling (e.g. if have just restarted 
brooklyn), before the persister is set.
+    // Must not throw NPE, but instead return something sensible (e.g. an 
empty state record).
+    @Test
+    public void 
testGetManagementPlaneSyncStateDoesNotThrowNpeBeforePersisterSet() throws 
Exception {
+        HighAvailabilityManagerImpl manager2 = new 
HighAvailabilityManagerImpl(managementContext)
+            .setPollPeriod(Duration.millis(10))
+            .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+            .setPromotionListener(promotionListener)
+            .setLocalTicker(ticker)
+            .setRemoteTicker(ticker);
+        try {
+            ManagementPlaneSyncRecord state = 
manager2.loadManagementPlaneSyncRecord(true);
+            assertNotNull(state);
+        } finally {
+            manager2.stop();
+        }
+
+    }
+    // Can get a log.error about our management node's heartbeat being out of 
date. Caused by
+    // poller first writing a heartbeat record, and then the clock being 
incremented. But the
+    // next poll fixes it.
+    public void testPromotes() throws Exception {
+        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+                .node(newManagerMemento(ownNodeId, 
ManagementNodeState.HOT_STANDBY))
+                .node(newManagerMemento("node1", ManagementNodeState.MASTER))
+                .setMaster("node1")
+                .build());
+        
+        manager.start(HighAvailabilityMode.AUTO);
+        
+        // Simulate passage of time; ticker used by this HA-manager so it will 
"correctly" publish
+        // its own heartbeat with the new time; but node1's record is now 
out-of-date.
+        tickerAdvance(Duration.seconds(31));
+        
+        // Expect to be notified of our promotion, as the only other node
+        promotionListener.assertCalledEventually();
+    }
+
+    @Test(groups="Integration") // because one second wait in 
succeedsContinually
+    public void testDoesNotPromoteIfMasterTimeoutNotExpired() throws Exception 
{
+        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+                .node(newManagerMemento(ownNodeId, 
ManagementNodeState.HOT_STANDBY))
+                .node(newManagerMemento("node1", ManagementNodeState.MASTER))
+                .setMaster("node1")
+                .build());
+        
+        manager.start(HighAvailabilityMode.AUTO);
+        
+        tickerAdvance(Duration.seconds(25));
+        
+        // Expect not to be notified, as 25s < 30s timeout
+        // (it's normally a fake clock so won't hit 30, even waiting 1s below 
- but in "IntegrationTest" subclasses it is real!)
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertTrue(promotionListener.callTimestamps.isEmpty(), 
"calls="+promotionListener.callTimestamps);
+            }});
+    }
+
+    public void testGetManagementPlaneStatus() throws Exception {
+        // with the name zzzzz the mgr created here should never be promoted 
by the alphabetical strategy!
+        
+        tickerAdvance(Duration.FIVE_SECONDS);
+        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+                .node(newManagerMemento(ownNodeId, 
ManagementNodeState.STANDBY))
+                .node(newManagerMemento("zzzzzzz_node1", 
ManagementNodeState.STANDBY))
+                .build());
+        persister.loadSyncRecord();
+        long zzzTime = tickerCurrentMillis();
+        tickerAdvance(Duration.FIVE_SECONDS);
+        
+        manager.start(HighAvailabilityMode.AUTO);
+        ManagementPlaneSyncRecord memento = 
manager.loadManagementPlaneSyncRecord(true);
+        
+        // Note can assert timestamp because not "real" time; it's using our 
own Ticker
+        assertEquals(memento.getMasterNodeId(), ownNodeId);
+        assertEquals(memento.getManagementNodes().keySet(), 
ImmutableSet.of(ownNodeId, "zzzzzzz_node1"));
+        assertEquals(memento.getManagementNodes().get(ownNodeId).getNodeId(), 
ownNodeId);
+        assertEquals(memento.getManagementNodes().get(ownNodeId).getStatus(), 
ManagementNodeState.MASTER);
+        
assertEquals(memento.getManagementNodes().get(ownNodeId).getLocalTimestamp(), 
tickerCurrentMillis());
+        
assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getNodeId(), 
"zzzzzzz_node1");
+        
assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getStatus(), 
ManagementNodeState.STANDBY);
+        
assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getLocalTimestamp(),
 zzzTime);
+    }
+
+    @Test(groups="Integration", invocationCount=50) //because we have had 
non-deterministic failures 
+    public void testGetManagementPlaneStatusManyTimes() throws Exception {
+        testGetManagementPlaneStatus();
+    }
+    
+    @Test
+    public void testGetManagementPlaneSyncStateInfersTimedOutNodeAsFailed() 
throws Exception {
+        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+                .node(newManagerMemento(ownNodeId, 
ManagementNodeState.HOT_STANDBY))
+                .node(newManagerMemento("node1", ManagementNodeState.MASTER))
+                .setMaster("node1")
+                .build());
+        
+        manager.start(HighAvailabilityMode.HOT_STANDBY);
+        
+        ManagementPlaneSyncRecord state = 
manager.loadManagementPlaneSyncRecord(true);
+        assertEquals(state.getManagementNodes().get("node1").getStatus(), 
ManagementNodeState.MASTER);
+        assertEquals(state.getManagementNodes().get(ownNodeId).getStatus(), 
ManagementNodeState.HOT_STANDBY);
+        
+        // Simulate passage of time; ticker used by this HA-manager so it will 
"correctly" publish
+        // its own heartbeat with the new time; but node1's record is now 
out-of-date.
+        tickerAdvance(Duration.seconds(31));
+        
+        ManagementPlaneSyncRecord state2 = 
manager.loadManagementPlaneSyncRecord(true);
+        assertEquals(state2.getManagementNodes().get("node1").getStatus(), 
ManagementNodeState.FAILED);
+        assertNotEquals(state.getManagementNodes().get(ownNodeId).getStatus(), 
ManagementNodeState.FAILED);
+    }
+
+    protected Duration getPollPeriod() {
+        return Duration.millis(10);
+    }
+    
+    protected long tickerCurrentMillis() {
+        return ticker.read();
+    }
+    
+    protected long tickerAdvance(Duration duration) {
+        currentTime.addAndGet(duration.toMilliseconds());
+        return tickerCurrentMillis();
+    }
+
+    protected Ticker getRemoteTicker() {
+        return ticker;
+    }
+    
+    protected ManagementNodeSyncRecord newManagerMemento(String nodeId, 
ManagementNodeState status) {
+        Builder rb = BasicManagementNodeSyncRecord.builder();
+        
rb.brooklynVersion(BrooklynVersion.get()).nodeId(nodeId).status(status);
+        rb.localTimestamp(tickerCurrentMillis());
+        if (getRemoteTicker()!=null)
+            rb.remoteTimestamp(getRemoteTicker().read());
+        return rb.build();
+    }
+    
+    public static class RecordingPromotionListener implements 
PromotionListener {
+        public final List<Long> callTimestamps = 
Lists.newCopyOnWriteArrayList();
+        
+        @Override
+        public void promotingToMaster() {
+            callTimestamps.add(System.currentTimeMillis());
+        }
+        
+        public void assertNotCalled() {
+            assertTrue(callTimestamps.isEmpty(), "calls="+callTimestamps);
+        }
+
+        public void assertCalled() {
+            assertFalse(callTimestamps.isEmpty(), "calls="+callTimestamps);
+        }
+
+        public void assertCalledEventually() {
+            Asserts.succeedsEventually(new Runnable() {
+                @Override public void run() {
+                    assertCalled();
+                }});
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java
new file mode 100644
index 0000000..da4f998
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.util.ArrayDeque;
+import java.util.Date;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.Application;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Feed;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.management.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import 
org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+import 
org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.apache.brooklyn.core.management.internal.AbstractManagementContext;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
+import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithFunctionFeedImpl;
+import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithNewFeedsEachTimeImpl;
+import brooklyn.entity.rebind.RebindManagerImpl;
+import brooklyn.entity.rebind.RebindTestFixture;
+import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
+import brooklyn.entity.rebind.persister.InMemoryObjectStore;
+import brooklyn.entity.rebind.persister.ListeningObjectStore;
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore;
+
+import 
org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation.LocalhostMachine;
+
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.text.ByteSizeStrings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.collect.Iterables;
+
+public class HotStandbyTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(HotStandbyTest.class);
+    
+    private List<HaMgmtNode> nodes = new 
MutableList<HotStandbyTest.HaMgmtNode>();
+    Map<String,String> sharedBackingStore = MutableMap.of();
+    Map<String,Date> sharedBackingStoreDates = MutableMap.of();
+    private ClassLoader classLoader = getClass().getClassLoader();
+    
+    public class HaMgmtNode {
+        // TODO share with WarmStandbyTest and SplitBrainTest and a few others 
(minor differences but worth it ultimately)
+
+        private ManagementContextInternal mgmt;
+        private String ownNodeId;
+        private String nodeName;
+        private ListeningObjectStore objectStore;
+        private ManagementPlaneSyncRecordPersister persister;
+        private HighAvailabilityManagerImpl ha;
+        private Duration persistOrRebindPeriod = Duration.ONE_SECOND;
+
+        public void setUp() throws Exception {
+            nodeName = "node "+nodes.size();
+            mgmt = newLocalManagementContext();
+            ownNodeId = mgmt.getManagementNodeId();
+            objectStore = new 
ListeningObjectStore(newPersistenceObjectStore());
+            objectStore.injectManagementContext(mgmt);
+            objectStore.prepareForSharedUse(PersistMode.CLEAN, 
HighAvailabilityMode.DISABLED);
+            persister = new 
ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader);
+            
((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento();
+            BrooklynMementoPersisterToObjectStore persisterObj = new 
BrooklynMementoPersisterToObjectStore(objectStore, 
mgmt.getBrooklynProperties(), classLoader);
+            
((RebindManagerImpl)mgmt.getRebindManager()).setPeriodicPersistPeriod(persistOrRebindPeriod);
+            mgmt.getRebindManager().setPersister(persisterObj, 
PersistenceExceptionHandlerImpl.builder().build());
+            ha = 
((HighAvailabilityManagerImpl)mgmt.getHighAvailabilityManager())
+                .setPollPeriod(Duration.PRACTICALLY_FOREVER)
+                .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+                .setPersister(persister);
+            log.info("Created "+nodeName+" "+ownNodeId);
+        }
+        
+        public void tearDownThisOnly() throws Exception {
+            if (ha != null) ha.stop();
+            if (mgmt!=null) mgmt.getRebindManager().stop();
+            if (mgmt != null) Entities.destroyAll(mgmt);
+        }
+        
+        public void tearDownAll() throws Exception {
+            tearDownThisOnly();
+            // can't delete the object store until all being torn down
+            if (objectStore != null) objectStore.deleteCompletely();
+        }
+        
+        @Override
+        public String toString() {
+            return nodeName+" "+ownNodeId;
+        }
+
+        public RebindManagerImpl rebinder() {
+            return (RebindManagerImpl)mgmt.getRebindManager();
+        }
+    }
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        nodes.clear();
+        sharedBackingStore.clear();
+    }
+    
+    public HaMgmtNode newNode(Duration persistOrRebindPeriod) throws Exception 
{
+        HaMgmtNode node = new HaMgmtNode();
+        node.persistOrRebindPeriod = persistOrRebindPeriod;
+        node.setUp();
+        nodes.add(node);
+        return node;
+    }
+
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        for (HaMgmtNode n: nodes)
+            n.tearDownAll();
+    }
+
+    protected ManagementContextInternal newLocalManagementContext() {
+        return new LocalManagementContextForTests();
+    }
+
+    protected PersistenceObjectStore newPersistenceObjectStore() {
+        return new InMemoryObjectStore(sharedBackingStore, 
sharedBackingStoreDates);
+    }
+
+    private HaMgmtNode createMaster(Duration persistOrRebindPeriod) throws 
Exception {
+        HaMgmtNode n1 = newNode(persistOrRebindPeriod);
+        n1.ha.start(HighAvailabilityMode.AUTO);
+        assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER);
+        return n1;
+    }
+    
+    private HaMgmtNode createHotStandby(Duration rebindPeriod) throws 
Exception {
+        HaMgmtNode n2 = newNode(rebindPeriod);
+        n2.ha.start(HighAvailabilityMode.HOT_STANDBY);
+        assertEquals(n2.ha.getNodeState(), ManagementNodeState.HOT_STANDBY);
+        return n2;
+    }
+
+    private TestApplication createFirstAppAndPersist(HaMgmtNode n1) throws 
Exception {
+        TestApplication app = 
TestApplication.Factory.newManagedInstanceForTests(n1.mgmt);
+        // for testing without enrichers, if desired:
+//        TestApplication app = 
ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class).impl(TestApplicationNoEnrichersImpl.class),
 n1.mgmt);
+        app.setDisplayName("First App");
+        app.start(MutableList.<Location>of());
+        app.config().set(TestEntity.CONF_NAME, "first-app");
+        app.setAttribute(TestEntity.SEQUENCE, 3);
+        
+        forcePersistNow(n1);
+        return app;
+    }
+
+    protected void forcePersistNow(HaMgmtNode n1) {
+        n1.mgmt.getRebindManager().forcePersistNow(false, null);
+    }
+    
+    private Application expectRebindSequenceNumber(HaMgmtNode master, 
HaMgmtNode hotStandby, Application app, int expectedSensorSequenceValue, 
boolean immediate) {
+        Application appRO = hotStandby.mgmt.lookup(app.getId(), 
Application.class);
+
+        if (immediate) {
+            forcePersistNow(master);
+            forceRebindNow(hotStandby);
+            EntityTestUtils.assertAttributeEquals(appRO, TestEntity.SEQUENCE, 
expectedSensorSequenceValue);
+        } else {
+            EntityTestUtils.assertAttributeEqualsEventually(appRO, 
TestEntity.SEQUENCE, expectedSensorSequenceValue);
+        }
+        
+        log.info("got sequence number "+expectedSensorSequenceValue+" from 
"+appRO);
+        
+        // make sure the instance (proxy) is unchanged
+        Application appRO2 = hotStandby.mgmt.lookup(app.getId(), 
Application.class);
+        Assert.assertTrue(appRO2==appRO);
+        
+        return appRO;
+    }
+
+    private void forceRebindNow(HaMgmtNode hotStandby) {
+        hotStandby.mgmt.getRebindManager().rebind(null, null, 
ManagementNodeState.HOT_STANDBY);
+    }
+    
+    @Test
+    public void 
testHotStandbySeesInitialCustomNameConfigAndSensorValueButDoesntAllowChange() 
throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        Application appRO = n2.mgmt.lookup(app.getId(), Application.class);
+        Assert.assertNotNull(appRO);
+        Assert.assertTrue(appRO instanceof TestApplication);
+        assertEquals(appRO.getDisplayName(), "First App");
+        assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app");
+        assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3);
+
+        try {
+            ((TestApplication)appRO).setAttribute(TestEntity.SEQUENCE, 4);
+            Assert.fail("Should not have allowed sensor to be set");
+        } catch (Exception e) {
+            
Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error 
message did not contain expected text: "+e);
+        }
+        assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3);
+    }
+
+    @Test
+    public void testHotStandbySeesChangesToNameConfigAndSensorValue() throws 
Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        Application appRO = n2.mgmt.lookup(app.getId(), Application.class);
+        Assert.assertNotNull(appRO);
+        assertEquals(appRO.getChildren().size(), 0);
+        
+        // test changes
+
+        app.setDisplayName("First App Renamed");
+        app.config().set(TestEntity.CONF_NAME, "first-app-renamed");
+        app.setAttribute(TestEntity.SEQUENCE, 4);
+
+        appRO = expectRebindSequenceNumber(n1, n2, app, 4, true);
+        assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1);
+        assertEquals(appRO.getDisplayName(), "First App Renamed");
+        assertEquals(appRO.getConfig(TestEntity.CONF_NAME), 
"first-app-renamed");
+        
+        // and change again for good measure!
+
+        app.setDisplayName("First App");
+        app.config().set(TestEntity.CONF_NAME, "first-app-restored");
+        app.setAttribute(TestEntity.SEQUENCE, 5);
+        
+        appRO = expectRebindSequenceNumber(n1, n2, app, 5, true);
+        assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1);
+        assertEquals(appRO.getDisplayName(), "First App");
+        assertEquals(appRO.getConfig(TestEntity.CONF_NAME), 
"first-app-restored");
+    }
+
+
+    public void testHotStandbySeesStructuralChangesIncludingRemoval() throws 
Exception {
+        doTestHotStandbySeesStructuralChangesIncludingRemoval(true);
+    }
+    
+    @Test(groups="Integration") // due to time (it waits for background 
persistence)
+    public void testHotStandbyUnforcedSeesStructuralChangesIncludingRemoval() 
throws Exception {
+        doTestHotStandbySeesStructuralChangesIncludingRemoval(false);
+    }
+    
+    public void doTestHotStandbySeesStructuralChangesIncludingRemoval(boolean 
immediate) throws Exception {
+        HaMgmtNode n1 = createMaster(immediate ? Duration.PRACTICALLY_FOREVER 
: Duration.millis(200));
+        TestApplication app = createFirstAppAndPersist(n1);
+        HaMgmtNode n2 = createHotStandby(immediate ? 
Duration.PRACTICALLY_FOREVER : Duration.millis(200));
+
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        Application appRO = n2.mgmt.lookup(app.getId(), Application.class);
+        Assert.assertNotNull(appRO);
+        assertEquals(appRO.getChildren().size(), 0);
+        
+        // test additions - new child, new app
+        
+        TestEntity child = 
app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME,
 "first-child"));
+        Entities.manage(child);
+        TestApplication app2 = 
TestApplication.Factory.newManagedInstanceForTests(n1.mgmt);
+        app2.config().set(TestEntity.CONF_NAME, "second-app");
+        
+        app.setAttribute(TestEntity.SEQUENCE, 4);
+        appRO = expectRebindSequenceNumber(n1, n2, app, 4, immediate);
+        
+        assertEquals(appRO.getChildren().size(), 1);
+        Entity childRO = Iterables.getOnlyElement(appRO.getChildren());
+        assertEquals(childRO.getId(), child.getId());
+        assertEquals(childRO.getConfig(TestEntity.CONF_NAME), "first-child");
+        
+        assertEquals(n2.mgmt.getApplications().size(), 2);
+        Application app2RO = n2.mgmt.lookup(app2.getId(), Application.class);
+        Assert.assertNotNull(app2RO);
+        assertEquals(app2RO.getConfig(TestEntity.CONF_NAME), "second-app");
+        
+        assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 3);
+        
+        // now test removals
+        
+        Entities.unmanage(child);
+        Entities.unmanage(app2);
+        
+        app.setAttribute(TestEntity.SEQUENCE, 5);
+        appRO = expectRebindSequenceNumber(n1, n2, app, 5, immediate);
+        
+        EntityTestUtils.assertAttributeEqualsEventually(appRO, 
TestEntity.SEQUENCE, 5);
+        assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1);
+        assertEquals(appRO.getChildren().size(), 0);
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        Assert.assertNull(n2.mgmt.lookup(app2.getId(), Application.class));
+        Assert.assertNull(n2.mgmt.lookup(child.getId(), Application.class));
+    }
+
+    @Test(groups="Integration", invocationCount=50)
+    public void testHotStandbySeesStructuralChangesIncludingRemovalManyTimes() 
throws Exception {
+        doTestHotStandbySeesStructuralChangesIncludingRemoval(true);
+    }
+
+    Deque<Long> usedMemory = new ArrayDeque<Long>();
+    protected long noteUsedMemory(String message) {
+        Time.sleep(Duration.millis(200));
+        for (HaMgmtNode n: nodes) {
+            
((AbstractManagementContext)n.mgmt).getGarbageCollector().gcIteration();
+        }
+        System.gc(); System.gc();
+        Time.sleep(Duration.millis(50));
+        System.gc(); System.gc();
+        long mem = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
+        usedMemory.addLast(mem);
+        log.info("Memory used - "+message+": 
"+ByteSizeStrings.java().apply(mem));
+        return mem;
+    }
+    public void assertUsedMemoryLessThan(String event, long max) {
+        noteUsedMemory(event);
+        long nowUsed = usedMemory.peekLast();
+        if (nowUsed > max) {
+            // aggressively try to force GC
+            Time.sleep(Duration.ONE_SECOND);
+            usedMemory.removeLast();
+            noteUsedMemory(event+" (extra GC)");
+            nowUsed = usedMemory.peekLast();
+            if (nowUsed > max) {
+                Assert.fail("Too much memory used - 
"+ByteSizeStrings.java().apply(nowUsed)+" > max 
"+ByteSizeStrings.java().apply(max));
+            }
+        }
+    }
+    public void assertUsedMemoryMaxDelta(String event, long deltaMegabytes) {
+        assertUsedMemoryLessThan(event, usedMemory.peekLast() + 
deltaMegabytes*1024*1024);
+    }
+
+    @Test(groups="Integration")
+    public void testHotStandbyDoesNotLeakLotsOfRebinds() throws Exception {
+        log.info("Starting test "+JavaClassNames.niceClassAndMethod());
+        final int DELTA = 2;
+        
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        long initialUsed = noteUsedMemory("Created app");
+        
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+        assertUsedMemoryMaxDelta("Standby created", DELTA);
+        
+        forcePersistNow(n1);
+        forceRebindNow(n2);
+        assertUsedMemoryMaxDelta("Persisted and rebinded once", DELTA);
+        
+        for (int i=0; i<10; i++) {
+            forcePersistNow(n1);
+            forceRebindNow(n2);
+        }
+        assertUsedMemoryMaxDelta("Persisted and rebinded 10x", DELTA);
+        
+        for (int i=0; i<1000; i++) {
+            if ((i+1)%100==0) {
+                noteUsedMemory("iteration "+(i+1));
+                usedMemory.removeLast();
+            }
+            forcePersistNow(n1);
+            forceRebindNow(n2);
+        }
+        assertUsedMemoryMaxDelta("Persisted and rebinded 1000x", DELTA);
+        
+        Entities.unmanage(app);
+        forcePersistNow(n1);
+        forceRebindNow(n2);
+        
+        assertUsedMemoryLessThan("And now all unmanaged", initialUsed + 
DELTA*1000*1000);
+    }
+
+    static class BigObject {
+        public BigObject(int sizeBytes) { array = new byte[sizeBytes]; }
+        byte[] array;
+    }
+    
+    @Test(groups="Integration")
+    public void testHotStandbyDoesNotLeakBigObjects() throws Exception {
+        log.info("Starting test "+JavaClassNames.niceClassAndMethod());
+        final int SIZE = 5;
+        final int SIZE_UP_BOUND = SIZE+2;
+        final int SIZE_DOWN_BOUND = SIZE-1;
+        final int GRACE = 2;
+        // the XML persistence uses a lot of space, we approx at between 2x 
and 3c
+        final int SIZE_IN_XML = 3*SIZE;
+        final int SIZE_IN_XML_DOWN = 2*SIZE;
+        
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);        
+        noteUsedMemory("Finished seeding");
+        Long initialUsed = usedMemory.peekLast();
+        app.config().set(TestEntity.CONF_OBJECT, new 
BigObject(SIZE*1000*1000));
+        assertUsedMemoryMaxDelta("Set a big config object", SIZE_UP_BOUND);
+        forcePersistNow(n1);
+        assertUsedMemoryMaxDelta("Persisted a big config object", SIZE_IN_XML);
+        
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+        forceRebindNow(n2);
+        assertUsedMemoryMaxDelta("Rebinded", SIZE_UP_BOUND);
+        
+        for (int i=0; i<10; i++)
+            forceRebindNow(n2);
+        assertUsedMemoryMaxDelta("Several more rebinds", GRACE);
+        for (int i=0; i<10; i++) {
+            forcePersistNow(n1);
+            forceRebindNow(n2);
+        }
+        assertUsedMemoryMaxDelta("And more rebinds and more persists", GRACE);
+        
+        app.config().set(TestEntity.CONF_OBJECT, "big is now small");
+        assertUsedMemoryMaxDelta("Big made small at primary", 
-SIZE_DOWN_BOUND);
+        forcePersistNow(n1);
+        assertUsedMemoryMaxDelta("And persisted", -SIZE_IN_XML_DOWN);
+        
+        forceRebindNow(n2);
+        assertUsedMemoryMaxDelta("And at secondary", -SIZE_DOWN_BOUND);
+        
+        Entities.unmanage(app);
+        forcePersistNow(n1);
+        forceRebindNow(n2);
+        
+        assertUsedMemoryLessThan("And now all unmanaged", 
initialUsed+GRACE*1000*1000);
+    }
+
+    @Test(groups="Integration") // because it's slow
+    // Sept 2014 - there is a small leak, of 200 bytes per child created and 
destroyed;
+    // but this goes away when the app is destroyed; it may be a benign record
+    public void 
testHotStandbyDoesNotLeakLotsOfRebindsCreatingAndDestroyingAChildEntity() 
throws Exception {
+        log.info("Starting test "+JavaClassNames.niceClassAndMethod());
+        final int DELTA = 2;
+        
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        long initialUsed = noteUsedMemory("Created app");
+        
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+        assertUsedMemoryMaxDelta("Standby created", DELTA);
+        
+        TestEntity lastChild = 
app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME,
 "first-child"));
+        Entities.manage(lastChild);
+        forcePersistNow(n1);
+        forceRebindNow(n2);
+        assertUsedMemoryMaxDelta("Child created and rebinded once", DELTA);
+        
+        for (int i=0; i<1000; i++) {
+            if (i==9 || (i+1)%100==0) {
+                noteUsedMemory("iteration "+(i+1));
+                usedMemory.removeLast();
+            }
+            TestEntity newChild = 
app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME,
 "first-child"));
+            Entities.manage(newChild);
+            Entities.unmanage(lastChild);
+            lastChild = newChild;
+            
+            forcePersistNow(n1);
+            forceRebindNow(n2);
+        }
+        assertUsedMemoryMaxDelta("Persisted and rebinded 1000x", DELTA);
+        
+        Entities.unmanage(app);
+        forcePersistNow(n1);
+        forceRebindNow(n2);
+        
+        assertUsedMemoryLessThan("And now all unmanaged", initialUsed + 
DELTA*1000*1000);
+    }
+    
+    protected void assertHotStandby(HaMgmtNode n1) {
+        assertEquals(n1.ha.getNodeState(), ManagementNodeState.HOT_STANDBY);
+        Assert.assertTrue(n1.rebinder().isReadOnlyRunning());
+        Assert.assertFalse(n1.rebinder().isPersistenceRunning());
+    }
+
+    protected void assertMaster(HaMgmtNode n1) {
+        assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER);
+        Assert.assertFalse(n1.rebinder().isReadOnlyRunning());
+        Assert.assertTrue(n1.rebinder().isPersistenceRunning());
+    }
+
+    @Test
+    public void testChangeMode() throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+
+        TestEntity child = 
app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME,
 "first-child"));
+        Entities.manage(child);
+        TestApplication app2 = 
TestApplication.Factory.newManagedInstanceForTests(n1.mgmt);
+        app2.config().set(TestEntity.CONF_NAME, "second-app");
+
+        forcePersistNow(n1);
+        n2.ha.setPriority(1);
+        n1.ha.changeMode(HighAvailabilityMode.HOT_STANDBY);
+        
+        // both now hot standby
+        assertHotStandby(n1);
+        assertHotStandby(n2);
+        
+        assertEquals(n1.mgmt.getApplications().size(), 2);
+        Application app2RO = n1.mgmt.lookup(app2.getId(), Application.class);
+        Assert.assertNotNull(app2RO);
+        assertEquals(app2RO.getConfig(TestEntity.CONF_NAME), "second-app");
+        try {
+            ((TestApplication)app2RO).setAttribute(TestEntity.SEQUENCE, 4);
+            Assert.fail("Should not have allowed sensor to be set");
+        } catch (Exception e) {
+            
Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error 
message did not contain expected text: "+e);
+        }
+
+        n1.ha.changeMode(HighAvailabilityMode.AUTO);
+        n2.ha.changeMode(HighAvailabilityMode.HOT_STANDBY, true, false);
+        // both still hot standby (n1 will defer to n2 as it has higher 
priority)
+        assertHotStandby(n1);
+        assertHotStandby(n2);
+        
+        // with priority 1, n2 will now be elected
+        n2.ha.changeMode(HighAvailabilityMode.AUTO);
+        assertHotStandby(n1);
+        assertMaster(n2);
+        
+        assertEquals(n2.mgmt.getApplications().size(), 2);
+        Application app2B = n2.mgmt.lookup(app2.getId(), Application.class);
+        Assert.assertNotNull(app2B);
+        assertEquals(app2B.getConfig(TestEntity.CONF_NAME), "second-app");
+        ((TestApplication)app2B).setAttribute(TestEntity.SEQUENCE, 4);
+        
+        forcePersistNow(n2);
+        forceRebindNow(n1);
+        Application app2BRO = n1.mgmt.lookup(app2.getId(), Application.class);
+        Assert.assertNotNull(app2BRO);
+        EntityTestUtils.assertAttributeEquals(app2BRO, TestEntity.SEQUENCE, 4);
+    }
+
+    @Test(groups="Integration", invocationCount=20)
+    public void testChangeModeManyTimes() throws Exception {
+        testChangeMode();
+    }
+
+    @Test
+    public void testChangeModeToDisabledAndBack() throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        
n1.mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachine.class));
+        @SuppressWarnings("unused")
+        TestApplication app = createFirstAppAndPersist(n1);
+        
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+        
+        // disabled n1 allows n2 to become master when next we tell it to check
+        n1.ha.changeMode(HighAvailabilityMode.DISABLED);
+        n2.ha.changeMode(HighAvailabilityMode.AUTO);
+        assertMaster(n2);
+        assertEquals(n1.ha.getNodeState(), ManagementNodeState.FAILED);
+        Assert.assertTrue(n1.mgmt.getApplications().isEmpty(), "n1 should have 
had no apps; instead had: "+n1.mgmt.getApplications());
+        Assert.assertTrue(n1.mgmt.getEntityManager().getEntities().isEmpty(), 
"n1 should have had no entities; instead had: 
"+n1.mgmt.getEntityManager().getEntities());
+        
Assert.assertTrue(n1.mgmt.getLocationManager().getLocations().isEmpty(), "n1 
should have had no locations; instead had: 
"+n1.mgmt.getLocationManager().getLocations());
+        
+        // we can now change n1 back to hot_standby
+        n1.ha.changeMode(HighAvailabilityMode.HOT_STANDBY);
+        assertHotStandby(n1);
+        // and it sees apps
+        Assert.assertFalse(n1.mgmt.getApplications().isEmpty(), "n1 should 
have had apps now");
+        
Assert.assertFalse(n1.mgmt.getLocationManager().getLocations().isEmpty(), "n1 
should have had locations now");
+        // and if n2 is disabled, n1 promotes
+        n2.ha.changeMode(HighAvailabilityMode.DISABLED);
+        n1.ha.changeMode(HighAvailabilityMode.AUTO);
+        assertMaster(n1);
+        assertEquals(n2.ha.getNodeState(), ManagementNodeState.FAILED);
+    }
+    
+    @Test
+    public void testHotStandbyDoesNotStartFeeds() throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        TestEntity entity = 
app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithFunctionFeedImpl.class));
+        forcePersistNow(n1);
+        Assert.assertTrue(entity.feeds().getFeeds().size() > 0, "Feeds: 
"+entity.feeds().getFeeds());
+        for (Feed feed : entity.feeds().getFeeds()) {
+            assertTrue(feed.isRunning(), "Feed expected running, but it is 
non-running");
+        }
+
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+        TestEntity entityRO = (TestEntity) n2.mgmt.lookup(entity.getId(), 
Entity.class);
+        Assert.assertTrue(entityRO.feeds().getFeeds().size() > 0, "Feeds: 
"+entity.feeds().getFeeds());
+        for (Feed feedRO : entityRO.feeds().getFeeds()) {
+            assertFalse(feedRO.isRunning(), "Feed expected non-active, but it 
is running");
+        }
+    }
+    
+    @Test(groups="Integration")
+    public void testHotStandbyDoesNotStartFeedsRebindingManyTimes() throws 
Exception {
+        testHotStandbyDoesNotStartFeeds();
+        final HaMgmtNode hsb = createHotStandby(Duration.millis(10));
+        Repeater.create("until 10 rebinds").every(Duration.millis(100)).until(
+            new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    return 
((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10;
+                }
+            }).runRequiringTrue();
+        // make sure not too many tasks (allowing 5 for rebind etc; currently 
just 2)
+        RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5);
+    }
+
+    @Test(groups="Integration")
+    public void 
testHotStandbyDoesNotStartFeedsRebindingManyTimesWithAnotherFeedGenerator() 
throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        TestEntity entity = 
app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithNewFeedsEachTimeImpl.class));
+        forcePersistNow(n1);
+        Assert.assertTrue(entity.feeds().getFeeds().size() == 4, "Feeds: 
"+entity.feeds().getFeeds());
+        
+        final HaMgmtNode hsb = createHotStandby(Duration.millis(10));
+        Repeater.create("until 10 rebinds").every(Duration.millis(100)).until(
+            new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    return 
((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10;
+                }
+            }).runRequiringTrue();
+        // make sure not too many tasks (allowing 5 for rebind etc; currently 
just 2)
+        RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java
 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java
new file mode 100644
index 0000000..2f08057
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+
+public class ImmutableManagementPlaneSyncRecord implements 
ManagementPlaneSyncRecord {
+    private final String masterNodeId;
+    private final Map<String, ManagementNodeSyncRecord> managementNodes;
+
+    ImmutableManagementPlaneSyncRecord(String masterNodeId, Map<String, 
ManagementNodeSyncRecord> nodes) {
+        this.masterNodeId = masterNodeId;
+        this.managementNodes = ImmutableMap.copyOf(nodes);
+    }
+    
+    @Override
+    public String getMasterNodeId() {
+        return masterNodeId;
+    }
+
+    @Override
+    public Map<String, ManagementNodeSyncRecord> getManagementNodes() {
+        return managementNodes;
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this).add("master", 
masterNodeId).add("nodes", managementNodes.keySet()).toString();
+    }
+    
+    @Override
+    public String toVerboseString() {
+        return Objects.toStringHelper(this).add("master", 
masterNodeId).add("nodes", managementNodes).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java
 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java
new file mode 100644
index 0000000..5791c88
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+import 
org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import 
org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.rebind.persister.InMemoryObjectStore;
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/** @deprecated since 0.7.0 use {@link 
ManagementPlaneSyncRecordPersisterToObjectStore}
+ * with {@link InMemoryObjectStore}
+ * <code>
+ * new ManagementPlaneSyncRecordPersisterToObjectStore(new 
InMemoryObjectStore(), classLoader)
+ * </code> */
+@Deprecated
+public class ManagementPlaneSyncRecordPersisterInMemory implements 
ManagementPlaneSyncRecordPersister {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ManagementPlaneSyncRecordPersisterInMemory.class);
+
+    private final MutableManagementPlaneSyncRecord memento = new 
MutableManagementPlaneSyncRecord();
+    
+    private volatile boolean running = true;
+    
+    @Override
+    public synchronized void stop() {
+        running = false;
+    }
+
+    @Override
+    public ManagementPlaneSyncRecord loadSyncRecord() throws IOException {
+        if (!running) {
+            throw new IllegalStateException("Persister not running; cannot 
load memento");
+        }
+        
+        return memento.snapshot();
+    }
+    
+    @VisibleForTesting
+    @Override
+    public synchronized void waitForWritesCompleted(Duration timeout) throws 
InterruptedException, TimeoutException {
+        // The synchronized is sufficient - guarantee that no concurrent calls
+        return;
+    }
+
+    @Override
+    public synchronized void delta(Delta delta) {
+        if (!running) {
+            if (LOG.isDebugEnabled()) LOG.debug("Persister not running; 
ignoring checkpointed delta of manager-memento");
+            return;
+        }
+        
+        for (ManagementNodeSyncRecord m : delta.getNodes()) {
+            memento.addNode(m);
+        }
+        for (String id : delta.getRemovedNodeIds()) {
+            memento.deleteNode(id);
+        }
+        switch (delta.getMasterChange()) {
+        case NO_CHANGE:
+            break; // no-op
+        case SET_MASTER:
+            memento.setMasterNodeId(checkNotNull(delta.getNewMasterOrNull()));
+            break;
+        case CLEAR_MASTER:
+            memento.setMasterNodeId(null);
+            break; // no-op
+        default:
+            throw new IllegalStateException("Unknown state for master-change: 
"+delta.getMasterChange());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java
new file mode 100644
index 0000000..8958c4b
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+import java.util.List;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import 
org.apache.brooklyn.core.management.ha.BasicMasterChooser.AlphabeticMasterChooser;
+import org.apache.brooklyn.core.management.ha.BasicMasterChooser.ScoredRecord;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.BrooklynVersion;
+import brooklyn.entity.basic.EntityFunctions;
+import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class MasterChooserTest {
+
+    private MutableManagementPlaneSyncRecord memento;
+    private AlphabeticMasterChooser chooser;
+    private long now;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        memento = new MutableManagementPlaneSyncRecord();
+        chooser = new AlphabeticMasterChooser();
+        now = System.currentTimeMillis();
+    }
+    
+    @Test
+    public void testChoosesFirstAlphanumeric() throws Exception {
+        memento.addNode(newManagerMemento("node1",  
ManagementNodeState.STANDBY, now));
+        memento.addNode(newManagerMemento("node2",  
ManagementNodeState.STANDBY, now));
+        memento.addNode(newManagerMemento("node3",  
ManagementNodeState.STANDBY, now));
+        Duration heartbeatTimeout = Duration.THIRTY_SECONDS;
+        String ownNodeId = "node2";
+        assertEquals(chooser.choose(memento, heartbeatTimeout, 
ownNodeId).getNodeId(), "node1");
+    }
+    
+    @Test
+    public void testReturnsNullIfNoValid() throws Exception {
+        memento.addNode(newManagerMemento("node1", 
ManagementNodeState.STANDBY, now - 31*1000));
+        Duration heartbeatTimeout = Duration.THIRTY_SECONDS;
+        assertNull(chooser.choose(memento, heartbeatTimeout, "node2"));
+    }
+    
+    @Test
+    public void testFiltersOutByHeartbeat() throws Exception {
+        memento.addNode(newManagerMemento("node1", 
ManagementNodeState.STANDBY, now - 31*1000));
+        memento.addNode(newManagerMemento("node2", 
ManagementNodeState.STANDBY, now - 20*1000));
+        memento.addNode(newManagerMemento("node3", 
ManagementNodeState.STANDBY, now));
+        Duration heartbeatTimeout = Duration.THIRTY_SECONDS;
+        assertEquals(getIds(chooser.sort(chooser.filterHealthy(memento, 
heartbeatTimeout, now))), ImmutableList.of("node2", "node3"));
+    }
+    
+    protected static List<String> getIds(List<ScoredRecord<?>> filterHealthy) {
+        return ImmutableList.copyOf(Iterables.transform(filterHealthy, 
EntityFunctions.id()));
+    }
+
+    @Test
+    public void testFiltersOutByStatusNotPreferringMaster() throws Exception {
+        assertEquals(doTestFiltersOutByStatus(false, false), 
ImmutableList.of("node4", "node5"));
+    }
+    @Test
+    public void testFiltersOutByStatusPreferringMaster() throws Exception {
+        assertEquals(doTestFiltersOutByStatus(true, false), 
ImmutableList.of("node5", "node4"));
+    }
+    
+    @Test
+    public void testFiltersOutByStatusNotPreferringHot() throws Exception {
+        assertEquals(doTestFiltersOutByStatus(false, true), 
ImmutableList.of("node4", "node5", "node6"));
+    }
+    @Test
+    public void testFiltersOutByStatusPreferringHot() throws Exception {
+        assertEquals(doTestFiltersOutByStatus(true, true), 
ImmutableList.of("node5", "node6", "node4"));
+    }
+    
+    protected List<String> doTestFiltersOutByStatus(boolean preferHot, boolean 
includeHot) throws Exception {
+        chooser = new AlphabeticMasterChooser(preferHot);
+        memento.addNode(newManagerMemento("node1", ManagementNodeState.FAILED, 
now));
+        memento.addNode(newManagerMemento("node2", 
ManagementNodeState.TERMINATED, now));
+        memento.addNode(newManagerMemento("node3", null, now));
+        memento.addNode(newManagerMemento("node4",  
ManagementNodeState.STANDBY, now));
+        memento.addNode(newManagerMemento("node5", ManagementNodeState.MASTER, 
now));
+        if (includeHot)
+            memento.addNode(newManagerMemento("node6",  
ManagementNodeState.HOT_STANDBY, now));
+        return getIds(chooser.sort(chooser.filterHealthy(memento, 
Duration.THIRTY_SECONDS, now)));
+    }
+
+    @Test
+    public void testExplicityPriority() throws Exception {
+        chooser = new AlphabeticMasterChooser();
+        memento.addNode(newManagerMemento("node1", 
ManagementNodeState.STANDBY, now, BrooklynVersion.get(), 2L));
+        memento.addNode(newManagerMemento("node2", 
ManagementNodeState.STANDBY, now, BrooklynVersion.get(), -1L));
+        memento.addNode(newManagerMemento("node3", 
ManagementNodeState.STANDBY, now, BrooklynVersion.get(), null));
+        List<String> order = 
getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, 
now)));
+        assertEquals(order, ImmutableList.of("node1", "node3", "node2"));
+    }
+
+    @Test
+    public void testVersionsMaybeNull() throws Exception {
+        chooser = new AlphabeticMasterChooser();
+        memento.addNode(newManagerMemento("node1", 
ManagementNodeState.STANDBY, now, "v10", null));
+        memento.addNode(newManagerMemento("node2", 
ManagementNodeState.STANDBY, now, "v3-snapshot", null));
+        memento.addNode(newManagerMemento("node3", 
ManagementNodeState.STANDBY, now, "v3-snapshot", -1L));
+        memento.addNode(newManagerMemento("node4", 
ManagementNodeState.STANDBY, now, "v3-snapshot", null));
+        memento.addNode(newManagerMemento("node5", 
ManagementNodeState.STANDBY, now, "v3-stable", null));
+        memento.addNode(newManagerMemento("node6", 
ManagementNodeState.STANDBY, now, "v1", null));
+        memento.addNode(newManagerMemento("node7", 
ManagementNodeState.STANDBY, now, null, null));
+        List<String> order = 
getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, 
now)));
+        assertEquals(order, ImmutableList.of("node1", "node5", "node6", 
"node2", "node4", "node7", "node3"));
+    }
+
+    private ManagementNodeSyncRecord newManagerMemento(String nodeId, 
ManagementNodeState status, long timestamp) {
+        return newManagerMemento(nodeId, status, timestamp, 
BrooklynVersion.get(), null);
+    }
+    private ManagementNodeSyncRecord newManagerMemento(String nodeId, 
ManagementNodeState status, long timestamp,
+            String version, Long priority) {
+        return 
BasicManagementNodeSyncRecord.builder().brooklynVersion(version).nodeId(nodeId).status(status).localTimestamp(timestamp).remoteTimestamp(timestamp).
+            priority(priority).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java
 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java
new file mode 100644
index 0000000..d9c8943
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+
+import com.google.common.collect.Maps;
+
+public class MutableManagementPlaneSyncRecord implements 
ManagementPlaneSyncRecord {
+    private String masterNodeId;
+    private Map<String, ManagementNodeSyncRecord> managementNodes = 
Maps.newConcurrentMap();
+
+    @Override
+    public String getMasterNodeId() {
+        return masterNodeId;
+    }
+
+    @Override
+    public Map<String, ManagementNodeSyncRecord> getManagementNodes() {
+        return managementNodes;
+    }
+
+    @Override
+    public String toVerboseString() {
+        return toString();
+    }
+
+    public ImmutableManagementPlaneSyncRecord snapshot() {
+        return new ImmutableManagementPlaneSyncRecord(masterNodeId, 
managementNodes);
+    }
+    
+    public void setMasterNodeId(String masterNodeId) {
+        this.masterNodeId = masterNodeId;
+    }
+    
+    public void addNode(ManagementNodeSyncRecord memento) {
+        managementNodes.put(memento.getNodeId(), memento);
+    }
+    
+    public void deleteNode(String nodeId) {
+        managementNodes.remove(nodeId);
+    }
+}
\ No newline at end of file

Reply via email to