http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java 
b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
deleted file mode 100644
index ff5d60e..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.guava.Maybe;
-import brooklyn.util.net.Networking;
-import brooklyn.util.time.Duration;
-
-import com.google.common.net.HostAndPort;
-
-/**
- * Monitors a given {@link HostAndPort}, to emit HASensors.CONNECTION_FAILED 
and HASensors.CONNECTION_RECOVERED 
- * if the connection is lost/restored.
- */
-@Catalog(name="Connection Failure Detector", description="HA policy for 
monitoring a host:port, "
-        + "emitting an event if the connection is lost/restored")
-public class ConnectionFailureDetector extends AbstractFailureDetector {
-
-    public static final ConfigKey<HostAndPort> ENDPOINT = 
ConfigKeys.newConfigKey(HostAndPort.class, 
"connectionFailureDetector.endpoint");
-
-    public static final ConfigKey<Duration> POLL_PERIOD = 
ConfigKeys.newConfigKey(Duration.class, "connectionFailureDetector.pollPeriod", 
"", Duration.ONE_SECOND);
-
-    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
-
-    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
-
-    @SetFromFlag("connectionFailedStabilizationDelay")
-    public static final ConfigKey<Duration> 
CONNECTION_FAILED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
-            .name("connectionFailureDetector.serviceFailedStabilizationDelay")
-            .description("Time period for which the connection must be 
consistently down for "
-                    + "(e.g. doesn't report down-up-down) before concluding 
failure. "
-                    + "Note that long TCP timeouts mean there can be long 
(e.g. 70 second) "
-                    + "delays in noticing a connection refused condition.")
-            .defaultValue(Duration.ZERO)
-            .build();
-
-    @SetFromFlag("connectionRecoveredStabilizationDelay")
-    public static final ConfigKey<Duration> 
CONNECTION_RECOVERED_STABILIZATION_DELAY = 
BasicConfigKey.builder(Duration.class)
-            
.name("connectionFailureDetector.serviceRecoveredStabilizationDelay")
-            .description("For a failed connection, time period for which the 
connection must be consistently up for (e.g. doesn't report up-down-up) before 
concluding recovered")
-            .defaultValue(Duration.ZERO)
-            .build();
-
-    @Override
-    public void init() {
-        super.init();
-        getRequiredConfig(ENDPOINT); // just to confirm it's set, failing fast
-        if (config().getRaw(SENSOR_FAILED).isAbsent()) {
-            config().set(SENSOR_FAILED, CONNECTION_FAILED);
-        }
-        if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
-            config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
-        }
-    }
-
-    @Override
-    protected CalculatedStatus calculateStatus() {
-        HostAndPort endpoint = getConfig(ENDPOINT);
-        boolean isHealthy = Networking.isReachable(endpoint);
-        return new BasicCalculatedStatus(isHealthy, "endpoint=" + endpoint);
-    }
-
-    //Persistence compatibility overrides
-    @Override
-    protected Duration getPollPeriod() {
-        return getConfig(POLL_PERIOD);
-    }
-
-    @Override
-    protected Duration getFailedStabilizationDelay() {
-        return getConfig(CONNECTION_FAILED_STABILIZATION_DELAY);
-    }
-
-    @Override
-    protected Duration getRecoveredStabilizationDelay() {
-        return getConfig(CONNECTION_RECOVERED_STABILIZATION_DELAY);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected Sensor<FailureDescriptor> getSensorFailed() {
-        Maybe<Object> sensorFailed = config().getRaw(SENSOR_FAILED);
-        if (sensorFailed.isPresent()) {
-            return (Sensor<FailureDescriptor>)sensorFailed.get();
-        } else {
-            return CONNECTION_FAILED;
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected Sensor<FailureDescriptor> getSensorRecovered() {
-        Maybe<Object> sensorRecovered = config().getRaw(SENSOR_RECOVERED);
-        if (sensorRecovered.isPresent()) {
-            return (Sensor<FailureDescriptor>)sensorRecovered.get();
-        } else {
-            return CONNECTION_RECOVERED;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/HASensors.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/HASensors.java 
b/policy/src/main/java/brooklyn/policy/ha/HASensors.java
deleted file mode 100644
index b940aa0..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/HASensors.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import brooklyn.event.basic.BasicNotificationSensor;
-
-import com.google.common.base.Objects;
-
-public class HASensors {
-
-    public static final BasicNotificationSensor<FailureDescriptor> 
ENTITY_FAILED = new BasicNotificationSensor<FailureDescriptor>(
-            FailureDescriptor.class, "ha.entityFailed", "Indicates that an 
entity has failed");
-    
-    public static final BasicNotificationSensor<FailureDescriptor> 
ENTITY_RECOVERED = new BasicNotificationSensor<FailureDescriptor>(
-            FailureDescriptor.class, "ha.entityRecovered", "Indicates that a 
previously failed entity has recovered");
-    
-    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_FAILED = new BasicNotificationSensor<FailureDescriptor>(
-            FailureDescriptor.class, "ha.connectionFailed", "Indicates that a 
connection has failed");
-    
-    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_RECOVERED = new BasicNotificationSensor<FailureDescriptor>(
-            FailureDescriptor.class, "ha.connectionRecovered", "Indicates that 
a previously failed connection has recovered");
-    
-    // TODO How to make this serializable with the entity reference
-    public static class FailureDescriptor {
-        private final Object component;
-        private final String description;
-        
-        public FailureDescriptor(Object component, String description) {
-            this.component = component;
-            this.description = description;
-        }
-        
-        public Object getComponent() {
-            return component;
-        }
-        
-        public String getDescription() {
-            return description;
-        }
-        
-        @Override
-        public String toString() {
-            return Objects.toStringHelper(this).add("component", 
component).add("description", description).toString();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java 
b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
deleted file mode 100644
index 2e4b719..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.core.util.config.ConfigBag;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-import org.apache.brooklyn.core.util.task.BasicTask;
-import org.apache.brooklyn.core.util.task.ScheduledTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.Lifecycle;
-import brooklyn.entity.basic.ServiceStateLogic;
-import brooklyn.entity.basic.ServiceStateLogic.ComputeServiceState;
-import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-/** 
- * Emits {@link HASensors#ENTITY_FAILED} whenever the parent's default logic 
({@link ComputeServiceState}) would detect a problem,
- * and similarly {@link HASensors#ENTITY_RECOVERED} when recovered.
- * <p>
- * gives more control over suppressing {@link Lifecycle#ON_FIRE}, 
- * for some period of time
- * (or until another process manually sets {@link 
Attributes#SERVICE_STATE_ACTUAL} to {@value Lifecycle#ON_FIRE},
- * which this enricher will not clear until all problems have gone away)
- */
-//@Catalog(name="Service Failure Detector", description="HA policy for 
deteting failure of a service")
-public class ServiceFailureDetector extends 
ServiceStateLogic.ComputeServiceState {
-
-    // TODO Remove duplication between this and MemberFailureDetectionPolicy.
-    // The latter could be re-written to use this. Or could even be deprecated
-    // in favour of this.
-
-    public enum LastPublished {
-        NONE,
-        FAILED,
-        RECOVERED;
-    }
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ServiceFailureDetector.class);
-
-    private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
-
-    public static final BasicNotificationSensor<FailureDescriptor> 
ENTITY_FAILED = HASensors.ENTITY_FAILED;
-
-    @SetFromFlag("onlyReportIfPreviouslyUp")
-    public static final ConfigKey<Boolean> ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP 
= ConfigKeys.newBooleanConfigKey("onlyReportIfPreviouslyUp", 
-        "Prevents the policy from emitting ENTITY_FAILED if the entity fails 
on startup (ie has never been up)", true);
-    
-    public static final ConfigKey<Boolean> MONITOR_SERVICE_PROBLEMS = 
ConfigKeys.newBooleanConfigKey("monitorServiceProblems", 
-        "Whether to monitor service problems, and emit on failures there (if 
set to false, this monitors only service up)", true);
-
-    @SetFromFlag("serviceOnFireStabilizationDelay")
-    public static final ConfigKey<Duration> 
SERVICE_ON_FIRE_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
-            .name("serviceOnFire.stabilizationDelay")
-            .description("Time period for which the service must be 
consistently down for (e.g. doesn't report down-up-down) before concluding 
ON_FIRE")
-            .defaultValue(Duration.ZERO)
-            .build();
-
-    @SetFromFlag("entityFailedStabilizationDelay")
-    public static final ConfigKey<Duration> ENTITY_FAILED_STABILIZATION_DELAY 
= BasicConfigKey.builder(Duration.class)
-            .name("entityFailed.stabilizationDelay")
-            .description("Time period for which the service must be 
consistently down for (e.g. doesn't report down-up-down) before emitting 
ENTITY_FAILED")
-            .defaultValue(Duration.ZERO)
-            .build();
-
-    @SetFromFlag("entityRecoveredStabilizationDelay")
-    public static final ConfigKey<Duration> 
ENTITY_RECOVERED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
-            .name("entityRecovered.stabilizationDelay")
-            .description("For a failed entity, time period for which the 
service must be consistently up for (e.g. doesn't report up-down-up) before 
emitting ENTITY_RECOVERED")
-            .defaultValue(Duration.ZERO)
-            .build();
-
-    @SetFromFlag("entityFailedRepublishTime")
-    public static final ConfigKey<Duration> ENTITY_FAILED_REPUBLISH_TIME = 
BasicConfigKey.builder(Duration.class)
-            .name("entityFailed.republishTime")
-            .description("Publish failed state periodically at the specified 
intervals, null to disable.")
-            .build();
-
-    protected Long firstUpTime;
-    
-    protected Long currentFailureStartTime = null;
-    protected Long currentRecoveryStartTime = null;
-    
-    protected Long publishEntityFailedTime = null;
-    protected Long publishEntityRecoveredTime = null;
-    protected Long setEntityOnFireTime = null;
-    
-    protected LastPublished lastPublished = LastPublished.NONE;
-
-    private final AtomicBoolean executorQueued = new AtomicBoolean(false);
-    private volatile long executorTime = 0;
-
-    /**
-     * TODO Really don't want this mutex!
-     * ServiceStateLogic.setExpectedState() will call into `onEvent(null)`, so 
could get concurrent calls.
-     * How to handle that? I don't think `ServiceStateLogic.setExpectedState` 
should be making the call, but
-     * presumably that is their to remove a race condition so it is set before 
method returns. Caller shouldn't
-     * rely on that though.
-     * e.g. see 
`ServiceFailureDetectorTest.testNotifiedOfFailureOnStateOnFire`, where we get 
two notifications.
-     */
-    private final Object mutex = new Object();
-    
-    public ServiceFailureDetector() {
-        this(new ConfigBag());
-    }
-    
-    public ServiceFailureDetector(Map<String,?> flags) {
-        this(new ConfigBag().putAll(flags));
-    }
-    
-    public ServiceFailureDetector(ConfigBag configBag) {
-        // TODO hierarchy should use ConfigBag, and not change flags
-        super(configBag.getAllConfigMutable());
-    }
-    
-    @Override
-    public void onEvent(SensorEvent<Object> event) {
-        if (firstUpTime==null) {
-            if (event!=null && Attributes.SERVICE_UP.equals(event.getSensor()) 
&& Boolean.TRUE.equals(event.getValue())) {
-                firstUpTime = event.getTimestamp();
-            } else if (event == null && 
Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
-                // If this enricher is registered after the entity is up, then 
we'll get a "synthetic" onEvent(null) 
-                firstUpTime = System.currentTimeMillis();
-            }
-        }
-        
-        super.onEvent(event);
-    }
-    
-    @Override
-    protected void setActualState(Lifecycle state) {
-        long now = System.currentTimeMillis();
-
-        synchronized (mutex) {
-            if (state==Lifecycle.ON_FIRE) {
-                if (lastPublished == LastPublished.FAILED) {
-                    if (currentRecoveryStartTime != null) {
-                        if (LOG.isDebugEnabled()) LOG.debug("{} health-check 
for {}, component was recovering, now failing: {}", new Object[] {this, entity, 
getExplanation(state)});
-                        currentRecoveryStartTime = null;
-                        publishEntityRecoveredTime = null;
-                    } else {
-                        if (LOG.isTraceEnabled()) LOG.trace("{} health-check 
for {}, component still failed: {}", new Object[] {this, entity, 
getExplanation(state)});
-                    }
-                } else {
-                    if (firstUpTime == null && 
getConfig(ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP)) {
-                        // suppress; won't publish
-                    } else if (currentFailureStartTime == null) {
-                        if (LOG.isDebugEnabled()) LOG.debug("{} health-check 
for {}, component now failing: {}", new Object[] {this, entity, 
getExplanation(state)});
-                        currentFailureStartTime = now;
-                        publishEntityFailedTime = currentFailureStartTime + 
getConfig(ENTITY_FAILED_STABILIZATION_DELAY).toMilliseconds();
-                    } else {
-                        if (LOG.isTraceEnabled()) LOG.trace("{} health-check 
for {}, component continuing failing: {}", new Object[] {this, entity, 
getExplanation(state)});
-                    }
-                }
-                if (setEntityOnFireTime == null) {
-                    setEntityOnFireTime = now + 
getConfig(SERVICE_ON_FIRE_STABILIZATION_DELAY).toMilliseconds();
-                }
-                currentRecoveryStartTime = null;
-                publishEntityRecoveredTime = null;
-                
-            } else if (state == Lifecycle.RUNNING) {
-                if (lastPublished == LastPublished.FAILED) {
-                    if (currentRecoveryStartTime == null) {
-                        if (LOG.isDebugEnabled()) LOG.debug("{} health-check 
for {}, component now recovering: {}", new Object[] {this, entity, 
getExplanation(state)});
-                        currentRecoveryStartTime = now;
-                        publishEntityRecoveredTime = currentRecoveryStartTime 
+ getConfig(ENTITY_RECOVERED_STABILIZATION_DELAY).toMilliseconds();
-                    } else {
-                        if (LOG.isTraceEnabled()) LOG.trace("{} health-check 
for {}, component continuing recovering: {}", new Object[] {this, entity, 
getExplanation(state)});
-                    }
-                } else {
-                    if (currentFailureStartTime != null) {
-                        if (LOG.isDebugEnabled()) LOG.debug("{} health-check 
for {}, component was failing, now healthy: {}", new Object[] {this, entity, 
getExplanation(state)});
-                    } else {
-                        if (LOG.isTraceEnabled()) LOG.trace("{} health-check 
for {}, component still healthy: {}", new Object[] {this, entity, 
getExplanation(state)});
-                    }
-                }
-                currentFailureStartTime = null;
-                publishEntityFailedTime = null;
-                setEntityOnFireTime = null;
-                
-            } else {
-                if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, 
in unconfirmed sate: {}", new Object[] {this, entity, getExplanation(state)});
-            }
-
-            long recomputeIn = Long.MAX_VALUE; // For whether to call 
recomputeAfterDelay
-            
-            if (publishEntityFailedTime != null) {
-                long delayBeforeCheck = publishEntityFailedTime - now;
-                if (delayBeforeCheck<=0) {
-                    if (LOG.isDebugEnabled()) LOG.debug("{} publishing failed 
(state={}; currentFailureStartTime={}; now={}", 
-                            new Object[] {this, state, 
Time.makeDateString(currentFailureStartTime), Time.makeDateString(now)});
-                    Duration republishDelay = 
getConfig(ENTITY_FAILED_REPUBLISH_TIME);
-                    if (republishDelay == null) {
-                        publishEntityFailedTime = null;
-                    } else {
-                        publishEntityFailedTime = now + 
republishDelay.toMilliseconds();
-                        recomputeIn = Math.min(recomputeIn, 
republishDelay.toMilliseconds());
-                    }
-                    lastPublished = LastPublished.FAILED;
-                    entity.emit(HASensors.ENTITY_FAILED, new 
HASensors.FailureDescriptor(entity, getFailureDescription(now)));
-                } else {
-                    recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
-                }
-            } else if (publishEntityRecoveredTime != null) {
-                long delayBeforeCheck = publishEntityRecoveredTime - now;
-                if (delayBeforeCheck<=0) {
-                    if (LOG.isDebugEnabled()) LOG.debug("{} publishing 
recovered (state={}; currentRecoveryStartTime={}; now={}", 
-                            new Object[] {this, state, 
Time.makeDateString(currentRecoveryStartTime), Time.makeDateString(now)});
-                    publishEntityRecoveredTime = null;
-                    lastPublished = LastPublished.RECOVERED;
-                    entity.emit(HASensors.ENTITY_RECOVERED, new 
HASensors.FailureDescriptor(entity, null));
-                } else {
-                    recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
-                }
-            }
-            
-            if (setEntityOnFireTime != null) {
-                long delayBeforeCheck = setEntityOnFireTime - now;
-                if (delayBeforeCheck<=0) {
-                    if (LOG.isDebugEnabled()) LOG.debug("{} setting on-fire, 
now that deferred period has passed (state={})", 
-                            new Object[] {this, state});
-                    setEntityOnFireTime = null;
-                    super.setActualState(state);
-                } else {
-                    recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
-                }
-            } else {
-                super.setActualState(state);
-            }
-            
-            if (recomputeIn < Long.MAX_VALUE) {
-                recomputeAfterDelay(recomputeIn);
-            }
-        }
-    }
-
-    protected String getExplanation(Lifecycle state) {
-        Duration serviceFailedStabilizationDelay = 
getConfig(ENTITY_FAILED_STABILIZATION_DELAY);
-        Duration serviceRecoveredStabilizationDelay = 
getConfig(ENTITY_RECOVERED_STABILIZATION_DELAY);
-
-        return String.format("location=%s; status=%s; lastPublished=%s; 
timeNow=%s; "+
-                    "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
-                entity.getLocations(), 
-                (state != null ? state : "<unreported>"),
-                lastPublished,
-                Time.makeDateString(System.currentTimeMillis()),
-                (currentFailureStartTime != null ? 
getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization 
"+Time.makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
-                (currentRecoveryStartTime != null ? 
getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization 
"+Time.makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
-    }
-    
-    private String getFailureDescription(long now) {
-        String description = null;
-        Map<String, Object> serviceProblems = 
entity.getAttribute(Attributes.SERVICE_PROBLEMS);
-        if (serviceProblems!=null && !serviceProblems.isEmpty()) {
-            Entry<String, Object> problem = 
serviceProblems.entrySet().iterator().next();
-            description = problem.getKey()+": "+problem.getValue();
-            if (serviceProblems.size()>1) {
-                description = serviceProblems.size()+" service problems, 
including "+description;
-            } else {
-                description = "service problem: "+description;
-            }
-        } else if 
(Boolean.FALSE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
-            description = "service not up";
-        } else {
-            description = "service failure detected";
-        }
-        if (publishEntityFailedTime!=null && currentFailureStartTime!=null && 
publishEntityFailedTime > currentFailureStartTime)
-            description = " (stabilized for "+Duration.of(now - 
currentFailureStartTime, TimeUnit.MILLISECONDS)+")";
-        return description;
-    }
-    
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    protected void recomputeAfterDelay(long delay) {
-        if (isRunning() && executorQueued.compareAndSet(false, true)) {
-            long now = System.currentTimeMillis();
-            delay = Math.max(0, Math.max(delay, (executorTime + 
MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
-            if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in 
{}ms", this, delay);
-            
-            Runnable job = new Runnable() {
-                @Override public void run() {
-                    try {
-                        executorTime = System.currentTimeMillis();
-                        executorQueued.set(false);
-
-                        onEvent(null);
-                        
-                    } catch (Exception e) {
-                        if (isRunning()) {
-                            LOG.error("Error in enricher "+this+": "+e, e);
-                        } else {
-                            if (LOG.isDebugEnabled()) LOG.debug("Error in 
enricher "+this+" (but no longer running): "+e, e);
-                        }
-                    } catch (Throwable t) {
-                        LOG.error("Error in enricher "+this+": "+t, t);
-                        throw Exceptions.propagate(t);
-                    }
-                }
-            };
-            
-            ScheduledTask task = new ScheduledTask(MutableMap.of("delay", 
Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job));
-            ((EntityInternal)entity).getExecutionContext().submit(task);
-        }
-    }
-    
-    private String getTimeStringSince(Long time) {
-        return time == null ? null : 
Time.makeTimeStringRounded(System.currentTimeMillis() - time);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java 
b/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java
deleted file mode 100644
index 6bea1d4..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.entity.basic.EntityLocal;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
-import org.apache.brooklyn.core.util.config.ConfigBag;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic;
-import brooklyn.entity.group.StopFailedRuntimeException;
-import brooklyn.entity.trait.MemberReplaceable;
-import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
-
-import com.google.common.base.Ticker;
-import com.google.common.collect.Lists;
-
-/** attaches to a DynamicCluster and replaces a failed member in response to 
HASensors.ENTITY_FAILED or other sensor;
- * if this fails, it sets the Cluster state to on-fire */
-@Catalog(name="Service Replacer", description="HA policy for replacing a 
failed member of a group")
-public class ServiceReplacer extends AbstractPolicy {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ServiceReplacer.class);
-
-    // TODO if there are multiple failures perhaps we should abort quickly
-    
-    public static final BasicNotificationSensor<FailureDescriptor> 
ENTITY_REPLACEMENT_FAILED = new BasicNotificationSensor<FailureDescriptor>(
-            FailureDescriptor.class, "ha.entityFailed.replacement", "Indicates 
that an entity replacement attempt has failed");
-
-    @SetFromFlag("setOnFireOnFailure")
-    public static final ConfigKey<Boolean> SET_ON_FIRE_ON_FAILURE = 
ConfigKeys.newBooleanConfigKey("setOnFireOnFailure", "", true);
-    
-    /** monitors this sensor, by default ENTITY_RESTART_FAILED */
-    @SetFromFlag("failureSensorToMonitor")
-    @SuppressWarnings("rawtypes")
-    public static final ConfigKey<Sensor> FAILURE_SENSOR_TO_MONITOR = new 
BasicConfigKey<Sensor>(Sensor.class, "failureSensorToMonitor", "", 
ServiceRestarter.ENTITY_RESTART_FAILED); 
-
-    /** skips replace if replacement has failed this many times failure 
re-occurs within this time interval */
-    @SetFromFlag("failOnRecurringFailuresInThisDuration")
-    public static final ConfigKey<Long> 
FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION = ConfigKeys.newLongConfigKey(
-            "failOnRecurringFailuresInThisDuration", 
-            "abandon replace if replacement has failed many times within this 
time interval",
-            5*60*1000L);
-
-    /** skips replace if replacement has failed this many times failure 
re-occurs within this time interval */
-    @SetFromFlag("failOnNumRecurringFailures")
-    public static final ConfigKey<Integer> FAIL_ON_NUM_RECURRING_FAILURES = 
ConfigKeys.newIntegerConfigKey(
-            "failOnNumRecurringFailures", 
-            "abandon replace if replacement has failed this many times (100% 
of attempts) within the time interval",
-            5);
-
-    @SetFromFlag("ticker")
-    public static final ConfigKey<Ticker> TICKER = 
ConfigKeys.newConfigKey(Ticker.class,
-            "ticker", 
-            "A time source (defaults to system-clock, which is almost 
certainly what's wanted, except in tests)",
-            null);
-
-    protected final List<Long> consecutiveReplacementFailureTimes = 
Lists.newCopyOnWriteArrayList();
-    
-    public ServiceReplacer() {
-        this(new ConfigBag());
-    }
-    
-    public ServiceReplacer(Map<String,?> flags) {
-        this(new ConfigBag().putAll(flags));
-    }
-    
-    public ServiceReplacer(ConfigBag configBag) {
-        // TODO hierarchy should use ConfigBag, and not change flags
-        super(configBag.getAllConfigMutable());
-    }
-    
-    public ServiceReplacer(Sensor<?> failureSensorToMonitor) {
-        this(new ConfigBag().configure(FAILURE_SENSOR_TO_MONITOR, 
failureSensorToMonitor));
-    }
-
-    @Override
-    public void setEntity(final EntityLocal entity) {
-        checkArgument(entity instanceof MemberReplaceable, "ServiceReplacer 
must take a MemberReplaceable, not %s", entity);
-        Sensor<?> failureSensorToMonitor = 
checkNotNull(getConfig(FAILURE_SENSOR_TO_MONITOR), "failureSensorToMonitor");
-        
-        super.setEntity(entity);
-
-        subscribeToMembers((Group)entity, failureSensorToMonitor, new 
SensorEventListener<Object>() {
-                @Override public void onEvent(final SensorEvent<Object> event) 
{
-                    // Must execute in another thread - if we called 
entity.replaceMember in the event-listener's thread
-                    // then we'd block all other events being delivered to 
this entity's other subscribers.
-                    // Relies on synchronization of `onDetectedFailure`.
-                    // See same pattern used in ServiceRestarter.
-                    
-                    // TODO Could use 
BasicExecutionManager.setTaskSchedulerForTag to prevent race of two
-                    // events being received in rapid succession, and 
onDetectedFailure being executed out-of-order
-                    // for them; or could write events to a blocking queue and 
have onDetectedFailure read from that.
-                    
-                    if (isRunning()) {
-                        LOG.warn("ServiceReplacer notified; dispatching job 
for "+entity+" ("+event.getValue()+")");
-                        
((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new 
Runnable() {
-                            @Override public void run() {
-                                onDetectedFailure(event);
-                            }});
-                    } else {
-                        LOG.warn("ServiceReplacer not running, so not acting 
on failure detected at "+entity+" ("+event.getValue()+", child of "+entity+")");
-                    }
-                }
-            });
-    }
-    
-    // TODO semaphores would be better to allow at-most-one-blocking behaviour
-    protected synchronized void onDetectedFailure(SensorEvent<Object> event) {
-        final Entity failedEntity = event.getSource();
-        final Object reason = event.getValue();
-        
-        if (isSuspended()) {
-            LOG.warn("ServiceReplacer suspended, so not acting on failure 
detected at "+failedEntity+" ("+reason+", child of "+entity+")");
-            return;
-        }
-
-        if (isRepeatedlyFailingTooMuch()) {
-            LOG.error("ServiceReplacer not acting on failure detected at 
"+failedEntity+" ("+reason+", child of "+entity+"), because too many recent 
replacement failures");
-            return;
-        }
-        
-        LOG.warn("ServiceReplacer acting on failure detected at 
"+failedEntity+" ("+reason+", child of "+entity+")");
-        
((EntityInternal)entity).getManagementSupport().getExecutionContext().submit(MutableMap.of(),
 new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    Entities.invokeEffectorWithArgs(entity, entity, 
MemberReplaceable.REPLACE_MEMBER, failedEntity.getId()).get();
-                    consecutiveReplacementFailureTimes.clear();
-                } catch (Exception e) {
-                    if (Exceptions.getFirstThrowableOfType(e, 
StopFailedRuntimeException.class) != null) {
-                        LOG.info("ServiceReplacer: ignoring error reported 
from stopping failed node "+failedEntity);
-                        return;
-                    }
-                    onReplacementFailed("Replace failure 
("+Exceptions.collapseText(e)+") at "+entity+": "+reason);
-                }
-            }
-        });
-    }
-
-    private boolean isRepeatedlyFailingTooMuch() {
-        Integer failOnNumRecurringFailures = 
getConfig(FAIL_ON_NUM_RECURRING_FAILURES);
-        long failOnRecurringFailuresInThisDuration = 
getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION);
-        long oldestPermitted = currentTimeMillis() - 
failOnRecurringFailuresInThisDuration;
-        
-        // trim old ones
-        for (Iterator<Long> iter = 
consecutiveReplacementFailureTimes.iterator(); iter.hasNext();) {
-            Long timestamp = iter.next();
-            if (timestamp < oldestPermitted) {
-                iter.remove();
-            } else {
-                break;
-            }
-        }
-        
-        return (consecutiveReplacementFailureTimes.size() >= 
failOnNumRecurringFailures);
-    }
-
-    protected long currentTimeMillis() {
-        Ticker ticker = getConfig(TICKER);
-        return (ticker == null) ? System.currentTimeMillis() : 
TimeUnit.NANOSECONDS.toMillis(ticker.read());
-    }
-    
-    protected void onReplacementFailed(String msg) {
-        LOG.warn("ServiceReplacer failed for "+entity+": "+msg);
-        consecutiveReplacementFailureTimes.add(currentTimeMillis());
-        
-        if (getConfig(SET_ON_FIRE_ON_FAILURE)) {
-            ServiceProblemsLogic.updateProblemsIndicator(entity, 
"ServiceReplacer", "replacement failed: "+msg);
-        }
-        entity.emit(ENTITY_REPLACEMENT_FAILED, new FailureDescriptor(entity, 
msg));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java 
b/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
deleted file mode 100644
index ab1359d..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.basic.EntityLocal;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
-import org.apache.brooklyn.core.util.config.ConfigBag;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.Lifecycle;
-import brooklyn.entity.basic.ServiceStateLogic;
-import brooklyn.entity.trait.Startable;
-import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.javalang.JavaClassNames;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.base.Preconditions;
-
-/** attaches to a SoftwareProcess (or anything Startable, emitting 
ENTITY_FAILED or other configurable sensor),
- * and invokes restart on failure; 
- * if there is a subsequent failure within a configurable time interval, or if 
the restart fails,
- * this gives up and emits {@link #ENTITY_RESTART_FAILED} 
- */
-@Catalog(name="Service Restarter", description="HA policy for restarting a 
service automatically, "
-        + "and for emitting an events if the service repeatedly fails")
-public class ServiceRestarter extends AbstractPolicy {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ServiceRestarter.class);
-
-    public static final BasicNotificationSensor<FailureDescriptor> 
ENTITY_RESTART_FAILED = new BasicNotificationSensor<FailureDescriptor>(
-            FailureDescriptor.class, "ha.entityFailed.restart", "Indicates 
that an entity restart attempt has failed");
-
-    /** skips retry if a failure re-occurs within this time interval */
-    @SetFromFlag("failOnRecurringFailuresInThisDuration")
-    public static final ConfigKey<Duration> 
FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION = ConfigKeys.newConfigKey(
-            Duration.class, 
-            "failOnRecurringFailuresInThisDuration", 
-            "Reports entity as failed if it fails two or more times in this 
time window", 
-            Duration.minutes(3));
-
-    @SetFromFlag("setOnFireOnFailure")
-    public static final ConfigKey<Boolean> SET_ON_FIRE_ON_FAILURE = 
ConfigKeys.newBooleanConfigKey("setOnFireOnFailure", "", true);
-
-    /** monitors this sensor, by default ENTITY_FAILED */
-    @SetFromFlag("failureSensorToMonitor")
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static final ConfigKey<Sensor<?>> FAILURE_SENSOR_TO_MONITOR = 
(ConfigKey) ConfigKeys.newConfigKey(Sensor.class, "failureSensorToMonitor", "", 
HASensors.ENTITY_FAILED); 
-    
-    protected final AtomicReference<Long> lastFailureTime = new 
AtomicReference<Long>();
-
-    public ServiceRestarter() {
-        this(new ConfigBag());
-    }
-    
-    public ServiceRestarter(Map<String,?> flags) {
-        this(new ConfigBag().putAll(flags));
-    }
-    
-    public ServiceRestarter(ConfigBag configBag) {
-        // TODO hierarchy should use ConfigBag, and not change flags
-        super(configBag.getAllConfigMutable());
-        uniqueTag = 
JavaClassNames.simpleClassName(getClass())+":"+getConfig(FAILURE_SENSOR_TO_MONITOR).getName();
-    }
-    
-    public ServiceRestarter(Sensor<?> failureSensorToMonitor) {
-        this(new ConfigBag().configure(FAILURE_SENSOR_TO_MONITOR, 
failureSensorToMonitor));
-    }
-
-    @Override
-    public void setEntity(final EntityLocal entity) {
-        Preconditions.checkArgument(entity instanceof Startable, "Restarter 
must take a Startable, not "+entity);
-        
-        super.setEntity(entity);
-        
-        subscribe(entity, getConfig(FAILURE_SENSOR_TO_MONITOR), new 
SensorEventListener<Object>() {
-                @Override public void onEvent(final SensorEvent<Object> event) 
{
-                    // Must execute in another thread - if we called 
entity.restart in the event-listener's thread
-                    // then we'd block all other events being delivered to 
this entity's other subscribers.
-                    // Relies on synchronization of `onDetectedFailure`.
-                    // See same pattern used in ServiceReplacer.
-
-                    // TODO Could use 
BasicExecutionManager.setTaskSchedulerForTag to prevent race of two
-                    // events being received in rapid succession, and 
onDetectedFailure being executed out-of-order
-                    // for them; or could write events to a blocking queue and 
have onDetectedFailure read from that.
-                    
-                    if (isRunning()) {
-                        LOG.info("ServiceRestarter notified; dispatching job 
for "+entity+" ("+event.getValue()+")");
-                        
((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new 
Runnable() {
-                            @Override public void run() {
-                                onDetectedFailure(event);
-                            }});
-                    } else {
-                        LOG.warn("ServiceRestarter not running, so not acting 
on failure detected at "+entity+" ("+event.getValue()+")");
-                    }
-                }
-            });
-    }
-    
-    // TODO semaphores would be better to allow at-most-one-blocking behaviour
-    // FIXME as this is called in message-dispatch (single threaded) we should 
do most of this in a new submitted task
-    // (as has been done in ServiceReplacer)
-    protected synchronized void onDetectedFailure(SensorEvent<Object> event) {
-        if (isSuspended()) {
-            LOG.warn("ServiceRestarter suspended, so not acting on failure 
detected at "+entity+" ("+event.getValue()+")");
-            return;
-        }
-
-        LOG.warn("ServiceRestarter acting on failure detected at "+entity+" 
("+event.getValue()+")");
-        long current = System.currentTimeMillis();
-        Long last = lastFailureTime.getAndSet(current);
-        long elapsed = last==null ? -1 : current-last;
-        if (elapsed>=0 && elapsed <= 
getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION).toMilliseconds()) {
-            onRestartFailed("Restart failure (failed again after 
"+Time.makeTimeStringRounded(elapsed)+") at "+entity+": "+event.getValue());
-            return;
-        }
-        try {
-            ServiceStateLogic.setExpectedState(entity, Lifecycle.STARTING);
-            Entities.invokeEffector(entity, entity, Startable.RESTART).get();
-        } catch (Exception e) {
-            onRestartFailed("Restart failure (error "+e+") at "+entity+": 
"+event.getValue());
-        }
-    }
-
-    protected void onRestartFailed(String msg) {
-        LOG.warn("ServiceRestarter failed for "+entity+": "+msg);
-        if (getConfig(SET_ON_FIRE_ON_FAILURE)) {
-            ServiceStateLogic.setExpectedState(entity, Lifecycle.ON_FIRE);
-        }
-        entity.emit(ENTITY_RESTART_FAILED, new FailureDescriptor(entity, msg));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java 
b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
deleted file mode 100644
index 1fa9982..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.core.util.internal.ssh.SshTool;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.event.basic.BasicNotificationSensor;
-
-import org.apache.brooklyn.location.basic.Machines;
-import org.apache.brooklyn.location.basic.SshMachineLocation;
-
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.guava.Maybe;
-import brooklyn.util.time.Duration;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-@Catalog(name="Ssh Connectivity Failure Detector", description="HA policy for 
monitoring an SshMachine, "
-        + "emitting an event if the connection is lost/restored")
-public class SshMachineFailureDetector extends AbstractFailureDetector {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SshMachineFailureDetector.class);
-    public static final String DEFAULT_UNIQUE_TAG = 
"failureDetector.sshMachine.tag";
-
-    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
-
-    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
-
-    public static final ConfigKey<Duration> CONNECT_TIMEOUT = 
ConfigKeys.newDurationConfigKey(
-            "ha.sshConnection.timeout", "How long to wait for conneciton 
before declaring failure", Duration.TEN_SECONDS);
-
-    @Override
-    public void init() {
-        super.init();
-        if (config().getRaw(SENSOR_FAILED).isAbsent()) {
-            config().set(SENSOR_FAILED, CONNECTION_FAILED);
-        }
-        if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
-            config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
-        }
-        if (config().getRaw(POLL_PERIOD).isAbsent()) {
-            config().set(POLL_PERIOD, Duration.ONE_MINUTE);
-        }
-        uniqueTag = DEFAULT_UNIQUE_TAG;
-    }
-
-    @Override
-    protected CalculatedStatus calculateStatus() {
-        Maybe<SshMachineLocation> sshMachineOption = 
Machines.findUniqueSshMachineLocation(entity.getLocations());
-        if (sshMachineOption.isPresent()) {
-            SshMachineLocation sshMachine = sshMachineOption.get();
-            try {
-                Duration timeout = config().get(CONNECT_TIMEOUT);
-                Map<String, ?> flags = ImmutableMap.of(
-                        SshTool.PROP_CONNECT_TIMEOUT.getName(), 
timeout.toMilliseconds(),
-                        SshTool.PROP_SESSION_TIMEOUT.getName(), 
timeout.toMilliseconds(),
-                        SshTool.PROP_SSH_TRIES.getName(), 1);
-                int exitCode = sshMachine.execCommands(flags, 
SshMachineFailureDetector.class.getName(), ImmutableList.of("exit"));
-                return new BasicCalculatedStatus(exitCode == 0, 
sshMachine.toString());
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                boolean isFirstFailure = lastPublished != LastPublished.FAILED 
&& currentFailureStartTime == null;
-                if (isFirstFailure) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Failed connecting to machine " + 
sshMachine, e);
-                    }
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Failed connecting to machine " + 
sshMachine, e);
-                    }
-                }
-                return new BasicCalculatedStatus(false, e.getMessage());
-            }
-        } else {
-            return new BasicCalculatedStatus(true, "no machine started, not 
complaining");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java 
b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java
deleted file mode 100644
index 0039685..0000000
--- 
a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.AbstractGroup;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.util.collections.QuorumCheck;
-import brooklyn.util.collections.QuorumCheck.QuorumChecks;
-
-/**
- * Contains worker items that can be moved between this container and others 
to effect load balancing.
- * Membership of a balanceable container does not imply a parent-child 
relationship in the Brooklyn
- * management sense.
- */
-public interface BalanceableContainer<ItemType extends Movable> extends 
Entity, AbstractGroup {
-    
-    public static BasicNotificationSensor<Entity> ITEM_ADDED = new 
BasicNotificationSensor<Entity>(
-            Entity.class, "balanceablecontainer.item.added", "Movable item 
added to balanceable container");
-    public static BasicNotificationSensor<Entity> ITEM_REMOVED = new 
BasicNotificationSensor<Entity>(
-            Entity.class, "balanceablecontainer.item.removed", "Movable item 
removed from balanceable container");
-    
-    public static final ConfigKey<QuorumCheck> UP_QUORUM_CHECK = 
ConfigKeys.newConfigKeyWithDefault(AbstractGroup.UP_QUORUM_CHECK, 
-        "Up check from members; default one for container overrides usual 
check to always return true, "
-        + "i.e. not block service up simply because the container is empty or 
something in the container has failed",
-        QuorumChecks.alwaysTrue());
-
-    public Set<ItemType> getBalanceableItems();
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java 
b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java
deleted file mode 100644
index 33f9e0b..0000000
--- 
a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.location.Location;
-
-/**
- * Captures the state of a balanceable cluster of containers and all their 
constituent items, including workrates,
- * for consumption by a {@link BalancingStrategy}.
- */
-public interface BalanceablePoolModel<ContainerType, ItemType> {
-    
-    // Attributes of the pool.
-    public String getName();
-    public int getPoolSize();
-    public Set<ContainerType> getPoolContents();
-    public double getPoolLowThreshold();
-    public double getPoolHighThreshold();
-    public double getCurrentPoolWorkrate();
-    public boolean isHot();
-    public boolean isCold();
-    
-    
-    // Attributes of containers and items.
-    public String getName(ContainerType container);
-    public Location getLocation(ContainerType container);
-    public double getLowThreshold(ContainerType container); // -1 for not 
known / invalid
-    public double getHighThreshold(ContainerType container); // -1 for not 
known / invalid
-    public double getTotalWorkrate(ContainerType container); // -1 for not 
known / invalid
-    public Map<ContainerType, Double> getContainerWorkrates(); // contains -1 
for items which are unknown
-    /** contains -1 instead of actual item workrate, for items which cannot be 
moved */
-    // @Nullable("null if the node is prevented from reporting and/or being 
adjusted, or has no data yet")
-    public Map<ItemType, Double> getItemWorkrates(ContainerType container);
-    public boolean isItemMoveable(ItemType item);
-    public boolean isItemAllowedIn(ItemType item, Location location);
-    
-    // Mutators for keeping the model in-sync with the observed world
-    public void onContainerAdded(ContainerType newContainer, double 
lowThreshold, double highThreshold);
-    public void onContainerRemoved(ContainerType oldContainer);
-    public void onItemAdded(ItemType item, ContainerType parentContainer);
-    public void onItemAdded(ItemType item, ContainerType parentContainer, 
boolean immovable);
-    public void onItemRemoved(ItemType item);
-    public void onItemWorkrateUpdated(ItemType item, double newValue);
-    public void onItemMoved(ItemType item, ContainerType targetContainer);
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java 
b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java
deleted file mode 100644
index c8d3a8b..0000000
--- 
a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Serializable;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
-
-import brooklyn.entity.trait.Resizable;
-import brooklyn.event.basic.BasicNotificationSensor;
-
-/**
- * Represents an elastic group of "container" entities, each of which is 
capable of hosting "item" entities that perform
- * work and consume the container's available resources (e.g. CPU or 
bandwidth). Auto-scaling and load-balancing policies can
- * be attached to this pool to provide dynamic elasticity based on workrates 
reported by the individual item entities.
- * <p>
- * The containers must be "up" in order to receive work, thus they must NOT 
follow the default enricher pattern
- * for groups which says that the group must be up to receive work.
- */
-@ImplementedBy(BalanceableWorkerPoolImpl.class)
-public interface BalanceableWorkerPool extends Entity, Resizable {
-
-    // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and 
ITEM_REMOVED in loadbalancing
-    // are of type ContainerItemPair, but in followTheSun it is just the 
`Entity item`.
-    
-    /** Encapsulates an item and a container; emitted for {@code ITEM_ADDED}, 
{@code ITEM_REMOVED} and
-     * {@code ITEM_MOVED} sensors.
-     */
-    public static class ContainerItemPair implements Serializable {
-        private static final long serialVersionUID = 1L;
-        public final BalanceableContainer<?> container;
-        public final Entity item;
-        
-        public ContainerItemPair(BalanceableContainer<?> container, Entity 
item) {
-            this.container = container;
-            this.item = checkNotNull(item);
-        }
-        
-        @Override
-        public String toString() {
-            return ""+item+" @ "+container;
-        }
-    }
-    
-    // Pool constituent notifications.
-    public static BasicNotificationSensor<Entity> CONTAINER_ADDED = new 
BasicNotificationSensor<Entity>(
-        Entity.class, "balanceablepool.container.added", "Container added to 
balanceable pool");
-    public static BasicNotificationSensor<Entity> CONTAINER_REMOVED = new 
BasicNotificationSensor<Entity>(
-        Entity.class, "balanceablepool.container.removed", "Container removed 
from balanceable pool");
-    public static BasicNotificationSensor<ContainerItemPair> ITEM_ADDED = new 
BasicNotificationSensor<ContainerItemPair>(
-        ContainerItemPair.class, "balanceablepool.item.added", "Item added to 
balanceable pool");
-    public static BasicNotificationSensor<ContainerItemPair> ITEM_REMOVED = 
new BasicNotificationSensor<ContainerItemPair>(
-        ContainerItemPair.class, "balanceablepool.item.removed", "Item removed 
from balanceable pool");
-    public static BasicNotificationSensor<ContainerItemPair> ITEM_MOVED = new 
BasicNotificationSensor<ContainerItemPair>(
-        ContainerItemPair.class, "balanceablepool.item.moved", "Item moved in 
balanceable pool to the given container");
-    
-    public void setResizable(Resizable resizable);
-    
-    public void setContents(Group containerGroup, Group itemGroup);
-    
-    public Group getContainerGroup();
-    
-    public Group getItemGroup();
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java
 
b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java
deleted file mode 100644
index b3f9633..0000000
--- 
a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.AbstractEntity;
-import brooklyn.entity.basic.AbstractGroup;
-import brooklyn.entity.trait.Resizable;
-import brooklyn.entity.trait.Startable;
-
-/**
- * @see BalanceableWorkerPool
- */
-public class BalanceableWorkerPoolImpl extends AbstractEntity implements 
BalanceableWorkerPool {
-
-    // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and 
ITEM_REMOVED in loadbalancing
-    // are of type ContainerItemPair, but in followTheSun it is just the 
`Entity item`.
-    
-    private static final Logger LOG = 
LoggerFactory.getLogger(BalanceableWorkerPool.class);
-    
-    private Group containerGroup;
-    private Group itemGroup;
-    private Resizable resizable;
-    
-    private final Set<Entity> containers = Collections.synchronizedSet(new 
HashSet<Entity>());
-    private final Set<Entity> items = Collections.synchronizedSet(new 
HashSet<Entity>());
-    
-    private final SensorEventListener<Object> eventHandler = new 
SensorEventListener<Object>() {
-        @Override
-        public void onEvent(SensorEvent<Object> event) {
-            if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", 
BalanceableWorkerPoolImpl.this, event);
-            Entity source = event.getSource();
-            Object value = event.getValue();
-            Sensor<?> sensor = event.getSensor();
-            
-            if (sensor.equals(AbstractGroup.MEMBER_ADDED)) {
-                if (source.equals(containerGroup)) {
-                    onContainerAdded((BalanceableContainer<?>) value);
-                } else if (source.equals(itemGroup)) {
-                    onItemAdded((Entity)value);
-                } else {
-                    throw new IllegalStateException("unexpected event 
source="+source);
-                }
-            } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) {
-                if (source.equals(containerGroup)) {
-                    onContainerRemoved((BalanceableContainer<?>) value);
-                } else if (source.equals(itemGroup)) {
-                    onItemRemoved((Entity) value);
-                } else {
-                    throw new IllegalStateException("unexpected event 
source="+source);
-                }
-            } else if (sensor.equals(Startable.SERVICE_UP)) {
-                // TODO What if start has failed? Is there a sensor to 
indicate that?
-                if ((Boolean)value) {
-                    onContainerUp((BalanceableContainer<?>) source);
-                } else {
-                    onContainerDown((BalanceableContainer<?>) source);
-                }
-            } else if (sensor.equals(Movable.CONTAINER)) {
-                onItemMoved(source, (BalanceableContainer<?>) value);
-            } else {
-                throw new IllegalStateException("Unhandled event type 
"+sensor+": "+event);
-            }
-        }
-    };
-    
-    public BalanceableWorkerPoolImpl() {
-    }
-
-    @Override
-    public void setResizable(Resizable resizable) {
-        this.resizable = resizable;
-    }
-    
-    @Override
-    public void setContents(Group containerGroup, Group itemGroup) {
-        this.containerGroup = containerGroup;
-        this.itemGroup = itemGroup;
-        if (resizable == null && containerGroup instanceof Resizable) 
resizable = (Resizable) containerGroup;
-        
-        subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
-        subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
-        subscribe(itemGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
-        subscribe(itemGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
-        
-        // Process extant containers and items
-        for (Entity existingContainer : containerGroup.getMembers()) {
-            onContainerAdded((BalanceableContainer<?>)existingContainer);
-        }
-        for (Entity existingItem : itemGroup.getMembers()) {
-            onItemAdded(existingItem);
-        }
-    }
-    
-    @Override
-    public Group getContainerGroup() {
-        return containerGroup;
-    }
-    
-    @Override
-    public Group getItemGroup() {
-        return itemGroup;
-    }
-
-    @Override
-    public Integer getCurrentSize() {
-        return containerGroup.getCurrentSize();
-    }
-    
-    @Override
-    public Integer resize(Integer desiredSize) {
-        if (resizable != null) return resizable.resize(desiredSize);
-        
-        throw new UnsupportedOperationException("Container group is not 
resizable, and no resizable supplied: "+containerGroup+" of type 
"+(containerGroup != null ? containerGroup.getClass().getCanonicalName() : 
null));
-    }
-    
-    private void onContainerAdded(BalanceableContainer<?> newContainer) {
-        subscribe(newContainer, Startable.SERVICE_UP, eventHandler);
-        if (!(newContainer instanceof Startable) || 
Boolean.TRUE.equals(newContainer.getAttribute(Startable.SERVICE_UP))) {
-            onContainerUp(newContainer);
-        }
-    }
-    
-    private void onContainerUp(BalanceableContainer<?> newContainer) {
-        if (containers.add(newContainer)) {
-            emit(CONTAINER_ADDED, newContainer);
-        }
-    }
-    
-    private void onContainerDown(BalanceableContainer<?> oldContainer) {
-        if (containers.remove(oldContainer)) {
-            emit(CONTAINER_REMOVED, oldContainer);
-        }
-    }
-    
-    private void onContainerRemoved(BalanceableContainer<?> oldContainer) {
-        unsubscribe(oldContainer);
-        onContainerDown(oldContainer);
-    }
-    
-    private void onItemAdded(Entity item) {
-        if (items.add(item)) {
-            subscribe(item, Movable.CONTAINER, eventHandler);
-            emit(ITEM_ADDED, new 
ContainerItemPair(item.getAttribute(Movable.CONTAINER), item));
-        }
-    }
-    
-    private void onItemRemoved(Entity item) {
-        if (items.remove(item)) {
-            unsubscribe(item);
-            emit(ITEM_REMOVED, new ContainerItemPair(null, item));
-        }
-    }
-    
-    private void onItemMoved(Entity item, BalanceableContainer<?> container) {
-        emit(ITEM_MOVED, new ContainerItemPair(container, item));
-    }
-}

Reply via email to