Merge remote-tracking branch 'aledsage/fix/rebind-and-ha-20140606' into reviews

Conflicts:
        core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
        
core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java
        
core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
        usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java

Mainly different-but-the-same logging and some minor interface/refactoring 
changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/458a117c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/458a117c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/458a117c

Branch: refs/heads/master
Commit: 458a117c39d07546afd76250c0ca44a123e43b32
Parents: 9f42565 85dc83e
Author: Alex Heneveld <alex.henev...@cloudsoftcorp.com>
Authored: Tue Jun 10 01:27:02 2014 +0100
Committer: Alex Heneveld <alex.henev...@cloudsoftcorp.com>
Committed: Tue Jun 10 01:27:02 2014 +0100

----------------------------------------------------------------------
 .../brooklyn/entity/rebind/ChangeListener.java  | 12 +++
 .../brooklyn/entity/rebind/RebindManager.java   |  4 +
 .../mementos/BrooklynMementoPersister.java      |  8 +-
 .../enricher/basic/AbstractEnricher.java        |  8 ++
 .../brooklyn/enricher/basic/Aggregator.java     | 14 +--
 .../java/brooklyn/enricher/basic/Combiner.java  | 14 +--
 .../brooklyn/enricher/basic/Transformer.java    | 13 ++-
 .../brooklyn/entity/basic/AbstractEntity.java   | 18 ++--
 .../entity/basic/EntityDynamicType.java         |  7 +-
 .../entity/rebind/BasicEntityRebindSupport.java |  1 +
 .../rebind/ImmediateDeltaChangeListener.java    | 36 ++++++--
 .../rebind/PeriodicDeltaChangeListener.java     | 38 ++++++--
 .../rebind/RebindContextLookupContext.java      | 21 +----
 .../entity/rebind/RebindManagerImpl.java        | 87 +++++++++++++++++-
 .../entity/rebind/dto/MementosGenerators.java   | 20 ++++-
 .../BrooklynMementoPersisterInMemory.java       | 75 ++++++++++------
 .../rebind/persister/XmlMementoSerializer.java  | 29 +++---
 .../event/feed/AttributePollHandler.java        | 13 ++-
 .../location/basic/AbstractLocation.java        |  6 +-
 .../ha/HighAvailabilityManagerImpl.java         |  9 +-
 .../internal/EntityChangeListener.java          | 18 ++--
 .../internal/EntityManagementSupport.java       | 18 +++-
 .../policy/basic/AbstractEntityAdjunct.java     | 11 ++-
 .../brooklyn/policy/basic/AbstractPolicy.java   |  8 ++
 .../java/brooklyn/enricher/EnrichersTest.java   | 93 ++++++++++++++++++--
 .../entity/rebind/RebindEnricherTest.java       | 44 ++++++++-
 .../entity/rebind/RebindEntityTest.java         | 60 +++++++++++--
 .../entity/rebind/RebindLocationTest.java       | 40 +++++++++
 .../entity/rebind/RebindManagerTest.java        | 45 ++++++++++
 .../entity/rebind/RebindPolicyTest.java         | 88 +++++++++++++++++-
 .../persister/XmlMementoSerializerTest.java     | 32 ++-----
 .../ha/HighAvailabilityManagerTestFixture.java  | 18 ++++
 .../brooklyn/location/jclouds/JcloudsUtil.java  |  5 +-
 .../jclouds/RebindJcloudsLocationTest.java      | 48 ++++++++++
 .../entity/webapp/DynamicWebAppClusterImpl.java | 12 ++-
 .../brooklyn/launcher/BrooklynLauncher.java     |  4 +-
 36 files changed, 800 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/entity/rebind/RebindContextLookupContext.java
----------------------------------------------------------------------
diff --cc 
core/src/main/java/brooklyn/entity/rebind/RebindContextLookupContext.java
index 1c021c7,0000000..392ea06
mode 100644,000000..100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindContextLookupContext.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindContextLookupContext.java
@@@ -1,63 -1,0 +1,50 @@@
 +package brooklyn.entity.rebind;
 +
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- 
 +import brooklyn.entity.Entity;
 +import brooklyn.location.Location;
 +import brooklyn.mementos.BrooklynMementoPersister.LookupContext;
 +import brooklyn.policy.Enricher;
 +import brooklyn.policy.Policy;
 +
 +public class RebindContextLookupContext implements LookupContext {
 +    
-     private static final Logger LOG = 
LoggerFactory.getLogger(RebindContextLookupContext.class);
-     
 +    protected final RebindContext rebindContext;
 +    protected final RebindExceptionHandler exceptionHandler;
 +    
 +    public RebindContextLookupContext(RebindContext rebindContext, 
RebindExceptionHandler exceptionHandler) {
 +        this.rebindContext = rebindContext;
 +        this.exceptionHandler = exceptionHandler;
 +    }
 +    
-     @Override public Entity lookupEntity(Class<?> type, String id) {
++    @Override public Entity lookupEntity(String id) {
 +        Entity result = rebindContext.getEntity(id);
 +        if (result == null) {
 +            result = exceptionHandler.onDanglingEntityRef(id);
-         } else if (type != null && !type.isInstance(result)) {
-             LOG.warn("Entity with id "+id+" does not match type "+type+"; 
returning "+result);
 +        }
 +        return result;
 +    }
 +    
-     @Override public Location lookupLocation(Class<?> type, String id) {
++    @Override public Location lookupLocation(String id) {
 +        Location result = rebindContext.getLocation(id);
 +        if (result == null) {
 +            result = exceptionHandler.onDanglingLocationRef(id);
-         } else if (type != null && !type.isInstance(result)) {
-             LOG.warn("Location with id "+id+" does not match type "+type+"; 
returning "+result);
 +        }
 +        return result;
 +    }
 +    
-     @Override public Policy lookupPolicy(Class<?> type, String id) {
++    @Override public Policy lookupPolicy(String id) {
 +        Policy result = rebindContext.getPolicy(id);
 +        if (result == null) {
 +            result = exceptionHandler.onDanglingPolicyRef(id);
-         } else if (type != null && !type.isInstance(result)) {
-             LOG.warn("Policy with id "+id+" does not match type "+type+"; 
returning "+result);
 +        }
 +        return result;
 +    }
 +    
-     @Override public Enricher lookupEnricher(Class<?> type, String id) {
++    @Override public Enricher lookupEnricher(String id) {
 +        Enricher result = rebindContext.getEnricher(id);
 +        if (result == null) {
 +            result = exceptionHandler.onDanglingEnricherRef(id);
-         } else if (type != null && !type.isInstance(result)) {
-             LOG.warn("Enricher with id "+id+" does not match type "+type+"; 
returning "+result);
 +        }
 +        return result;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index 658b1c1,c2dc932..c10a17d
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@@ -45,7 -47,7 +47,8 @@@ import brooklyn.util.collections.Mutabl
  import brooklyn.util.exceptions.Exceptions;
  import brooklyn.util.flags.FlagUtils;
  import brooklyn.util.javalang.Reflections;
+ import brooklyn.util.task.BasicExecutionContext;
 +import brooklyn.util.text.Strings;
  import brooklyn.util.time.Duration;
  
  import com.google.common.annotations.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterInMemory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
index a3163f4,67c210d..14551d2
--- a/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
@@@ -522,11 -516,27 +516,27 @@@ public class RebindEntityTest extends R
      }
  
      @Test
+     public void testRebindPersistsDynamicAttribute() throws Exception {
+         final String sensorName = "test.mydynamicsensor";
+         final String sensorDescription = "My description";
+         final AttributeSensor<String> MY_DYNAMIC_SENSOR = new 
BasicAttributeSensor<String>(
+                 String.class, sensorName, sensorDescription);
+ 
+         origApp.setAttribute(MY_DYNAMIC_SENSOR, "myval");
+         
assertEquals(origApp.getEntityType().getSensor(sensorName).getDescription(), 
sensorDescription);
+ 
+         newApp = rebind();
+         
+         assertEquals(newApp.getAttribute(MY_DYNAMIC_SENSOR), "myval");
+         
assertEquals(newApp.getEntityType().getSensor(sensorName).getDescription(), 
sensorDescription);
+     }
+ 
+     @Test
      public void testRebindWhenPreviousAppDestroyedHasNoApp() throws Exception 
{
          origApp.stop();
 -        
 +
          RebindTestUtils.waitForPersisted(origManagementContext);
 -        LocalManagementContext newManagementContext = 
RebindTestUtils.newPersistingManagementContextUnstarted(mementoDir, 
classLoader);
 +        newManagementContext = 
RebindTestUtils.newPersistingManagementContextUnstarted(mementoDir, 
classLoader);
          List<Application> newApps = 
newManagementContext.getRebindManager().rebind(classLoader);
          newManagementContext.getRebindManager().start();
          

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
----------------------------------------------------------------------
diff --cc 
core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
index 9f7601a,0000000..959c0ba
mode 100644,000000..100644
--- 
a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
+++ 
b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
@@@ -1,214 -1,0 +1,232 @@@
 +package brooklyn.management.ha;
 +
 +import static org.testng.Assert.assertEquals;
 +import static org.testng.Assert.assertFalse;
 +import static org.testng.Assert.assertNotEquals;
++import static org.testng.Assert.assertNotNull;
 +import static org.testng.Assert.assertTrue;
 +
 +import java.util.List;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.testng.annotations.AfterMethod;
 +import org.testng.annotations.BeforeMethod;
 +import org.testng.annotations.Test;
 +
 +import brooklyn.BrooklynVersion;
 +import brooklyn.entity.basic.Entities;
 +import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
 +import brooklyn.entity.rebind.persister.PersistMode;
 +import brooklyn.entity.rebind.persister.PersistenceObjectStore;
 +import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord;
 +import brooklyn.management.ha.HighAvailabilityManagerImpl.PromotionListener;
 +import brooklyn.management.internal.ManagementContextInternal;
 +import brooklyn.test.Asserts;
 +import brooklyn.test.entity.LocalManagementContextForTests;
 +import brooklyn.util.time.Duration;
 +
 +import com.google.common.base.Ticker;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Lists;
 +
 +@Test
 +public abstract class HighAvailabilityManagerTestFixture {
 +
 +    @SuppressWarnings("unused")
 +    private static final Logger log = 
LoggerFactory.getLogger(HighAvailabilityManagerTestFixture.class);
 +    
 +    private ManagementPlaneSyncRecordPersister persister;
 +    private ManagementContextInternal managementContext;
 +    private String ownNodeId;
 +    private HighAvailabilityManagerImpl manager;
 +    private Ticker ticker;
 +    private AtomicLong currentTime; // used to set the ticker's return value
 +    private RecordingPromotionListener promotionListener;
 +    private ClassLoader classLoader = getClass().getClassLoader();
 +    private PersistenceObjectStore objectStore;
 +    
 +    @BeforeMethod(alwaysRun=true)
 +    public void setUp() throws Exception {
 +        currentTime = new AtomicLong(System.currentTimeMillis());
 +        ticker = new Ticker() {
 +            // strictly not a ticker because returns millis UTC, but it works 
fine even so
 +            @Override public long read() {
 +                return currentTime.get();
 +            }
 +        };
 +        promotionListener = new RecordingPromotionListener();
 +        managementContext = newLocalManagementContext();
 +        ownNodeId = managementContext.getManagementNodeId();
 +        objectStore = newPersistenceObjectStore();
 +        objectStore.prepareForUse(managementContext, PersistMode.CLEAN);
 +        persister = new 
ManagementPlaneSyncRecordPersisterToObjectStore(managementContext, objectStore, 
classLoader);
 +        BrooklynMementoPersisterToObjectStore persisterObj = new 
BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
 +        managementContext.getRebindManager().setPersister(persisterObj);
 +        manager = new HighAvailabilityManagerImpl(managementContext)
 +                .setPollPeriod(Duration.millis(10))
 +                .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
 +                .setPromotionListener(promotionListener)
 +                .setTicker(ticker)
 +                .setPersister(persister);
 +    }
 +
 +    protected ManagementContextInternal newLocalManagementContext() {
 +        return new LocalManagementContextForTests();
 +    }
 +
 +    protected abstract PersistenceObjectStore newPersistenceObjectStore();
 +    
 +    @AfterMethod(alwaysRun=true)
 +    public void tearDown() throws Exception {
 +        if (manager != null) manager.stop();
 +        if (managementContext != null) Entities.destroyAll(managementContext);
 +        if (objectStore != null) objectStore.deleteCompletely();
 +    }
 +    
++    // The web-console could still be polling (e.g. if have just restarted 
brooklyn), before the persister is set.
++    // Must not throw NPE, but instead return something sensible (e.g. an 
empty state record).
++    @Test
++    public void 
testGetManagementPlaneSyncStateDoesNotThrowNpeBeforePersisterSet() throws 
Exception {
++        HighAvailabilityManagerImpl manager2 = new 
HighAvailabilityManagerImpl(managementContext)
++            .setPollPeriod(Duration.millis(10))
++            .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
++            .setPromotionListener(promotionListener)
++            .setTicker(ticker);
++        try {
++            ManagementPlaneSyncRecord state = 
manager2.getManagementPlaneSyncState();
++            assertNotNull(state);
++        } finally {
++            manager2.stop();
++        }
++
++    }
 +    // Can get a log.error about our management node's heartbeat being out of 
date. Caused by
 +    // poller first writing a heartbeat record, and then the clock being 
incremented. But the
 +    // next poll fixes it.
 +    public void testPromotes() throws Exception {
 +        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
 +                .node(newManagerMemento(ownNodeId, 
ManagementNodeState.STANDBY, tickerCurrentMillis()))
 +                .node(newManagerMemento("node1", ManagementNodeState.MASTER, 
tickerCurrentMillis()))
 +                .setMaster("node1")
 +                .build());
 +        
 +        manager.start(HighAvailabilityMode.AUTO);
 +        
 +        // Simulate passage of time; ticker used by this HA-manager so it 
will "correctly" publish
 +        // its own heartbeat with the new time; but node1's record is now 
out-of-date.
 +        tickerAdvance(Duration.seconds(31));
 +        
 +        // Expect to be notified of our promotion, as the only other node
 +        promotionListener.assertCalledEventually();
 +    }
 +
 +    @Test(groups="Integration") // because one second wait in 
succeedsContinually
 +    public void testDoesNotPromoteIfMasterTimeoutNotExpired() throws 
Exception {
 +        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
 +                .node(newManagerMemento(ownNodeId, 
ManagementNodeState.STANDBY, tickerCurrentMillis()))
 +                .node(newManagerMemento("node1", ManagementNodeState.MASTER, 
tickerCurrentMillis()))
 +                .setMaster("node1")
 +                .build());
 +        
 +        manager.start(HighAvailabilityMode.AUTO);
 +        
 +        tickerAdvance(Duration.seconds(29));
 +        
 +        // Expect not to be notified, as 29s < 30s timeout (it's a fake clock 
so won't hit 30, even waiting 1s below)
 +        Asserts.succeedsContinually(new Runnable() {
 +            @Override public void run() {
 +                assertTrue(promotionListener.callTimestamps.isEmpty(), 
"calls="+promotionListener.callTimestamps);
 +            }});
 +    }
 +
 +    public void testGetManagementPlaneStatus() throws Exception {
 +        // with the name zzzzz the mgr created here should never be promoted 
by the alphabetical strategy!
 +        
 +        tickerAdvance(Duration.FIVE_SECONDS);
 +        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
 +                .node(newManagerMemento("zzzzzzz_node1", 
ManagementNodeState.STANDBY, tickerCurrentMillis()))
 +                .build());
 +        long zzzTime = tickerCurrentMillis();
 +        tickerAdvance(Duration.FIVE_SECONDS);
 +        
 +        manager.start(HighAvailabilityMode.AUTO);
 +        ManagementPlaneSyncRecord memento = 
manager.getManagementPlaneSyncState();
 +        
 +        // Note can assert timestamp because not "real" time; it's using our 
own Ticker
 +        assertEquals(memento.getMasterNodeId(), ownNodeId);
 +        assertEquals(memento.getManagementNodes().keySet(), 
ImmutableSet.of(ownNodeId, "zzzzzzz_node1"));
 +        assertEquals(memento.getManagementNodes().get(ownNodeId).getNodeId(), 
ownNodeId);
 +        assertEquals(memento.getManagementNodes().get(ownNodeId).getStatus(), 
ManagementNodeState.MASTER);
 +        
assertEquals(memento.getManagementNodes().get(ownNodeId).getTimestampUtc(), 
tickerCurrentMillis());
 +        
assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getNodeId(), 
"zzzzzzz_node1");
 +        
assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getStatus(), 
ManagementNodeState.STANDBY);
 +        
assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getTimestampUtc(),
 zzzTime);
 +    }
 +
 +    @Test(groups="Integration", invocationCount=50) //because we have had 
non-deterministic failures 
 +    public void testGetManagementPlaneStatusManyTimes() throws Exception {
 +        testGetManagementPlaneStatus();
 +    }
 +    
 +    @Test
 +    public void testGetManagementPlaneSyncStateInfersTimedOutNodeAsFailed() 
throws Exception {
 +        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
 +                .node(newManagerMemento(ownNodeId, 
ManagementNodeState.STANDBY, tickerCurrentMillis()))
 +                .node(newManagerMemento("node1", ManagementNodeState.MASTER, 
tickerCurrentMillis()))
 +                .setMaster("node1")
 +                .build());
 +        
 +        manager.start(HighAvailabilityMode.AUTO);
 +        
 +        ManagementPlaneSyncRecord state = 
manager.getManagementPlaneSyncState();
 +        assertEquals(state.getManagementNodes().get("node1").getStatus(), 
ManagementNodeState.MASTER);
 +        assertEquals(state.getManagementNodes().get(ownNodeId).getStatus(), 
ManagementNodeState.STANDBY);
 +        
 +        // Simulate passage of time; ticker used by this HA-manager so it 
will "correctly" publish
 +        // its own heartbeat with the new time; but node1's record is now 
out-of-date.
 +        tickerAdvance(Duration.seconds(31));
 +        
 +        ManagementPlaneSyncRecord state2 = 
manager.getManagementPlaneSyncState();
 +        assertEquals(state2.getManagementNodes().get("node1").getStatus(), 
ManagementNodeState.FAILED);
 +        
assertNotEquals(state.getManagementNodes().get(ownNodeId).getStatus(), 
ManagementNodeState.FAILED);
 +    }
 +
 +    private long tickerCurrentMillis() {
 +        return ticker.read();
 +    }
 +    
 +    private long tickerAdvance(Duration duration) {
 +        currentTime.addAndGet(duration.toMilliseconds());
 +        return tickerCurrentMillis();
 +    }
 +
 +    private ManagementNodeSyncRecord newManagerMemento(String nodeId, 
ManagementNodeState status, long timestamp) {
 +        return 
BasicManagementNodeSyncRecord.builder().brooklynVersion(BrooklynVersion.get()).nodeId(nodeId).status(status).timestampUtc(timestamp).build();
 +    }
 +    
 +    public static class RecordingPromotionListener implements 
PromotionListener {
 +        public final List<Long> callTimestamps = 
Lists.newCopyOnWriteArrayList();
 +        
 +        @Override
 +        public void promotingToMaster() {
 +            callTimestamps.add(System.currentTimeMillis());
 +        }
 +        
 +        public void assertNotCalled() {
 +            assertTrue(callTimestamps.isEmpty(), "calls="+callTimestamps);
 +        }
 +
 +        public void assertCalled() {
 +            assertFalse(callTimestamps.isEmpty(), "calls="+callTimestamps);
 +        }
 +
 +        public void assertCalledEventually() {
 +            Asserts.succeedsEventually(new Runnable() {
 +                @Override public void run() {
 +                    assertCalled();
 +                }});
 +        }
 +    };
 +}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/458a117c/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
----------------------------------------------------------------------

Reply via email to