Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 6153b0a8f -> 04fc801d0


new policies: SshConnectionFailure, ConditionalSuspendPolicy

  * SshConnectionFailure emits CONNECTION_FAILURE if it can't make ssh 
connection to the machine of the entity
  * ConditionalSuspendPolicy suspends a target policy if it receives a sensor 
event (CONNECTION_FAILURE by default)


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/dd0c2348
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/dd0c2348
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/dd0c2348

Branch: refs/heads/master
Commit: dd0c2348a2677526f29866b72aaa8cbe351c503a
Parents: 0ff4216
Author: Svetoslav Neykov <[email protected]>
Authored: Tue Mar 24 15:17:10 2015 +0200
Committer: Svetoslav Neykov <[email protected]>
Committed: Tue Apr 7 12:13:40 2015 +0300

----------------------------------------------------------------------
 .../policy/ha/AbstractFailureDetector.java      | 359 +++++++++++++++++++
 .../policy/ha/ConditionalSuspendPolicy.java     | 101 ++++++
 .../policy/ha/ConnectionFailureDetector.java    | 290 ++-------------
 .../policy/ha/SshMachineFailureDetector.java    |  99 +++++
 4 files changed, 597 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java 
b/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
new file mode 100644
index 0000000..6a1324c
--- /dev/null
+++ b/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.policy.ha;
+
+import static brooklyn.util.time.Time.makeTimeStringRounded;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.Sensor;
+import brooklyn.management.Task;
+import brooklyn.policy.basic.AbstractPolicy;
+import brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.task.BasicTask;
+import brooklyn.util.task.ScheduledTask;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.reflect.TypeToken;
+
+public abstract class AbstractFailureDetector extends AbstractPolicy {
+
+    // TODO Remove duplication from ServiceFailureDetector, particularly for 
the stabilisation delays.
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractFailureDetector.class);
+
+    private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
+
+    public static final ConfigKey<Duration> POLL_PERIOD = 
ConfigKeys.newDurationConfigKey(
+            "failureDetector.pollPeriod", "", Duration.ONE_SECOND);
+
+    @SetFromFlag("failedStabilizationDelay")
+    public static final ConfigKey<Duration> FAILED_STABILIZATION_DELAY = 
ConfigKeys.newDurationConfigKey(
+            "failureDetector.serviceFailedStabilizationDelay",
+            "Time period for which the health check consistently fails "
+                    + "(e.g. doesn't report failed-ok-faled) before concluding 
failure.",
+            Duration.ZERO);
+
+    @SetFromFlag("recoveredStabilizationDelay")
+    public static final ConfigKey<Duration> RECOVERED_STABILIZATION_DELAY = 
ConfigKeys.newDurationConfigKey(
+            "failureDetector.serviceRecoveredStabilizationDelay",
+            "Time period for which the health check succeeds continiually " +
+                    "(e.g. doesn't report ok-failed-ok) before concluding 
recovered",
+            Duration.ZERO);
+
+    @SuppressWarnings("serial")
+    public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_FAILED = 
ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {},
+            "failureDetector.sensor.fail", "A sensor which will indicate 
failure when set", HASensors.ENTITY_FAILED);
+
+    @SuppressWarnings("serial")
+    public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_RECOVERED 
= ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {},
+            "failureDetector.sensor.recover", "A sensor which will indicate 
recovery from failure when set", HASensors.ENTITY_RECOVERED);
+
+    public interface CalculatedStatus {
+        boolean isHealthy();
+        String getDescription();
+    }
+
+    private final class PublishJob implements Runnable {
+        @Override public void run() {
+            try {
+                executorTime = System.currentTimeMillis();
+                executorQueued.set(false);
+
+                publishNow();
+
+            } catch (Exception e) {
+                if (isRunning()) {
+                    LOG.error("Problem resizing: "+e, e);
+                } else {
+                    if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but 
no longer running: "+e, e);
+                }
+            } catch (Throwable t) {
+                LOG.error("Problem in service-failure-detector: "+t, t);
+                throw Exceptions.propagate(t);
+            }
+        }
+    }
+
+    private final class HealthPoller implements Runnable {
+        @Override
+        public void run() {
+            checkHealth();
+        }
+    }
+
+    private final class HealthPollingTaskFactory implements Callable<Task<?>> {
+        @Override
+        public Task<?> call() {
+            BasicTask<Void> task = new BasicTask<Void>(new HealthPoller());
+            BrooklynTaskTags.setTransient(task);
+            return task;
+        }
+    }
+
+    protected static class BasicCalculatedStatus implements CalculatedStatus {
+        private boolean healthy;
+        private String description;
+
+        public BasicCalculatedStatus(boolean healthy, String description) {
+            this.healthy = healthy;
+            this.description = description;
+        }
+
+        @Override
+        public boolean isHealthy() {
+            return healthy;
+        }
+
+        @Override
+        public String getDescription() {
+            return description;
+        }
+    }
+
+    public enum LastPublished {
+        NONE,
+        FAILED,
+        RECOVERED;
+    }
+
+    protected final AtomicReference<Long> stateLastGood = new 
AtomicReference<Long>();
+    protected final AtomicReference<Long> stateLastFail = new 
AtomicReference<Long>();
+
+    protected Long currentFailureStartTime = null;
+    protected Long currentRecoveryStartTime = null;
+
+    protected LastPublished lastPublished = LastPublished.NONE;
+
+    private final AtomicBoolean executorQueued = new AtomicBoolean(false);
+    private volatile long executorTime = 0;
+
+    private Callable<Task<?>> pollingTaskFactory = new 
HealthPollingTaskFactory();
+
+    private Task<?> scheduledTask;
+
+    protected abstract CalculatedStatus calculateStatus();
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+
+        if (isRunning()) {
+            doStartPolling();
+        }
+    }
+
+    @Override
+    public void suspend() {
+        scheduledTask.cancel(true);
+        super.suspend();
+    }
+
+    @Override
+    public void resume() {
+        currentFailureStartTime = null;
+        currentRecoveryStartTime = null;
+        lastPublished = LastPublished.NONE;
+        executorQueued.set(false);
+        executorTime = 0;
+
+        super.resume();
+        doStartPolling();
+    }
+
+    protected void doStartPolling() {
+        if (scheduledTask == null || scheduledTask.isDone()) {
+            ScheduledTask task = new ScheduledTask(MutableMap.of("period", 
getPollPeriod(), "displayName", getTaskName()), pollingTaskFactory);
+            scheduledTask = 
((EntityInternal)entity).getExecutionContext().submit(task);;
+        }
+    }
+
+    private String getTaskName() {
+        return getDisplayName();
+    }
+
+    protected Duration getPollPeriod() {
+        return getConfig(POLL_PERIOD);
+    }
+
+    protected Duration getFailedStabilizationDelay() {
+        return getConfig(FAILED_STABILIZATION_DELAY);
+    }
+
+    protected Duration getRecoveredStabilizationDelay() {
+        return getConfig(RECOVERED_STABILIZATION_DELAY);
+    }
+
+    protected Sensor<FailureDescriptor> getSensorFailed() {
+        return getConfig(SENSOR_FAILED);
+    }
+
+    protected Sensor<FailureDescriptor> getSensorRecovered() {
+        return getConfig(SENSOR_RECOVERED);
+    }
+
+    private synchronized void checkHealth() {
+        CalculatedStatus status = calculateStatus();
+        boolean healthy = status.isHealthy();
+        long now = System.currentTimeMillis();
+
+        if (healthy) {
+            stateLastGood.set(now);
+            if (lastPublished == LastPublished.FAILED) {
+                if (currentRecoveryStartTime == null) {
+                    LOG.info("{} check for {}, now recovering: {}", new 
Object[] {this, entity, getDescription(status)});
+                    currentRecoveryStartTime = now;
+                    schedulePublish();
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, 
continuing recovering: {}", new Object[] {this, entity, 
getDescription(status)});
+                }
+            } else {
+                if (currentFailureStartTime != null) {
+                    LOG.info("{} check for {}, now healthy: {}", new Object[] 
{this, entity, getDescription(status)});
+                    currentFailureStartTime = null;
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, 
still healthy: {}", new Object[] {this, entity, getDescription(status)});
+                }
+            }
+        } else {
+            stateLastFail.set(now);
+            if (lastPublished != LastPublished.FAILED) {
+                if (currentFailureStartTime == null) {
+                    LOG.info("{} check for {}, now failing: {}", new Object[] 
{this, entity, getDescription(status)});
+                    currentFailureStartTime = now;
+                    schedulePublish();
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, 
continuing failing: {}", new Object[] {this, entity, getDescription(status)});
+                }
+            } else {
+                if (currentRecoveryStartTime != null) {
+                    LOG.info("{} check for {}, now failing: {}", new Object[] 
{this, entity, getDescription(status)});
+                    currentRecoveryStartTime = null;
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, 
still failed: {}", new Object[] {this, entity, getDescription(status)});
+                }
+            }
+        }
+    }
+
+    protected void schedulePublish() {
+        schedulePublish(0);
+    }
+
+    protected void schedulePublish(long delay) {
+        if (isRunning() && executorQueued.compareAndSet(false, true)) {
+            long now = System.currentTimeMillis();
+            delay = Math.max(0, Math.max(delay, (executorTime + 
MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
+            if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in 
{}ms", this, delay);
+
+            Runnable job = new PublishJob();
+
+            ScheduledTask task = new ScheduledTask(MutableMap.of("delay", 
Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask<Void>(job));
+            ((EntityInternal)entity).getExecutionContext().submit(task);
+        }
+    }
+
+    private synchronized void publishNow() {
+        if (!isRunning()) return;
+
+        CalculatedStatus calculatedStatus = calculateStatus();
+        boolean healthy = calculatedStatus.isHealthy();
+
+        Long lastUpTime = stateLastGood.get();
+        Long lastDownTime = stateLastFail.get();
+        long serviceFailedStabilizationDelay = 
getFailedStabilizationDelay().toMilliseconds();
+        long serviceRecoveredStabilizationDelay = 
getRecoveredStabilizationDelay().toMilliseconds();
+        long now = System.currentTimeMillis();
+
+        if (healthy) {
+            if (lastPublished == LastPublished.FAILED) {
+                // only publish if consistently up for 
serviceRecoveredStabilizationDelay
+                long currentRecoveryPeriod = getTimeDiff(now, 
currentRecoveryStartTime);
+                long sinceLastDownPeriod = getTimeDiff(now, lastDownTime);
+                if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay 
&& sinceLastDownPeriod > serviceRecoveredStabilizationDelay) {
+                    String description = getDescription(calculatedStatus);
+                    LOG.warn("{} check for {}, publishing recovered: {}", new 
Object[] {this, entity, description});
+                    entity.emit(getSensorRecovered(), new 
HASensors.FailureDescriptor(entity, description));
+                    lastPublished = LastPublished.RECOVERED;
+                    currentFailureStartTime = null;
+                } else {
+                    long nextAttemptTime = 
Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, 
serviceRecoveredStabilizationDelay - sinceLastDownPeriod);
+                    schedulePublish(nextAttemptTime);
+                }
+            }
+        } else {
+            if (lastPublished != LastPublished.FAILED) {
+                // only publish if consistently down for 
serviceFailedStabilizationDelay
+                long currentFailurePeriod = getTimeDiff(now, 
currentFailureStartTime);
+                long sinceLastUpPeriod = getTimeDiff(now, lastUpTime);
+                if (currentFailurePeriod > serviceFailedStabilizationDelay && 
sinceLastUpPeriod > serviceFailedStabilizationDelay) {
+                    String description = getDescription(calculatedStatus);
+                    LOG.warn("{} connectivity-check for {}, publishing failed: 
{}", new Object[] {this, entity, description});
+                    entity.emit(getSensorFailed(), new 
HASensors.FailureDescriptor(entity, description));
+                    lastPublished = LastPublished.FAILED;
+                    currentRecoveryStartTime = null;
+                } else {
+                    long nextAttemptTime = 
Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, 
serviceFailedStabilizationDelay - sinceLastUpPeriod);
+                    schedulePublish(nextAttemptTime);
+                }
+            }
+        }
+    }
+
+    protected String getDescription(CalculatedStatus status) {
+        Long lastUpTime = stateLastGood.get();
+        Long lastDownTime = stateLastGood.get();
+        Duration serviceFailedStabilizationDelay = 
getFailedStabilizationDelay();
+        Duration serviceRecoveredStabilizationDelay = 
getRecoveredStabilizationDelay();
+
+        return String.format("%s; healthy=%s; timeNow=%s; lastUp=%s; 
lastDown=%s; lastPublished=%s; "+
+                    "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
+                status.getDescription(),
+                status.isHealthy(),
+                Time.makeDateString(System.currentTimeMillis()),
+                (lastUpTime != null ? Time.makeDateString(lastUpTime) : 
"<never>"),
+                (lastDownTime != null ? Time.makeDateString(lastDownTime) : 
"<never>"),
+                lastPublished,
+                (currentFailureStartTime != null ? 
getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization 
"+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
+                (currentRecoveryStartTime != null ? 
getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization 
"+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
+    }
+
+    private long getTimeDiff(Long recent, Long previous) {
+        return (previous == null) ? recent : (recent - previous);
+    }
+
+    private String getTimeStringSince(Long time) {
+        return time == null ? null : 
Time.makeTimeStringRounded(System.currentTimeMillis() - time);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java 
b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
new file mode 100644
index 0000000..6e37ef9
--- /dev/null
+++ b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.policy.ha;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.Sensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.policy.Policy;
+import brooklyn.policy.basic.AbstractPolicy;
+import brooklyn.util.flags.SetFromFlag;
+
+import com.google.common.base.Preconditions;
+
+public class ConditionalSuspendPolicy extends AbstractPolicy {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConditionalSuspendPolicy.class);
+
+    @SetFromFlag("suppressSensor")
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ConfigKey<Sensor<?>> SUSPEND_SENSOR = (ConfigKey) 
ConfigKeys.newConfigKey(Sensor.class,
+            "suppressSensor", "Sensor which will suppress the target policy", 
HASensors.CONNECTION_FAILED); 
+
+    @SetFromFlag("resetSensor")
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ConfigKey<Sensor<?>> RESUME_SENSOR = (ConfigKey) 
ConfigKeys.newConfigKey(Sensor.class,
+            "resetSensor", "Resume target policy when this sensor is 
observed", HASensors.CONNECTION_RECOVERED);
+
+    @SetFromFlag("target")
+    public static final ConfigKey<Object> SUSPEND_TARGET = 
ConfigKeys.newConfigKey(Object.class,
+            "target", "The target policy to suspend. Either direct reference 
or the value of the suspendTarget config on a policy from the same entity.");
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+        Object target = config().get(SUSPEND_TARGET);
+        Preconditions.checkNotNull(target, "Suspend target required");
+        Preconditions.checkNotNull(getTargetPolicy(), "Can't find target 
policy set in " + SUSPEND_TARGET.getName() + ": " + target);
+        subscribe();
+    }
+
+    private void subscribe() {
+        subscribe(entity, getConfig(SUSPEND_SENSOR), new 
SensorEventListener<Object>() {
+            @Override public void onEvent(final SensorEvent<Object> event) {
+                if (isRunning()) {
+                    Policy target = getTargetPolicy();
+                    target.suspend();
+                    LOG.debug("Suspended policy " + target + ", triggered by " 
+ event.getSensor() + " = " + event.getValue());
+                }
+            }
+
+        });
+        subscribe(entity, getConfig(RESUME_SENSOR), new 
SensorEventListener<Object>() {
+            @Override public void onEvent(final SensorEvent<Object> event) {
+                if (isRunning()) {
+                    Policy target = getTargetPolicy();
+                    target.resume();
+                    LOG.debug("Resumed policy " + target + ", triggered by " + 
event.getSensor() + " = " + event.getValue());
+                }
+            }
+        });
+    }
+
+    private Policy getTargetPolicy() {
+        Object target = config().get(SUSPEND_TARGET);
+        if (target instanceof Policy) {
+            return (Policy)target;
+        } else if (target instanceof String) {
+            for (Policy policy : entity.getPolicies()) {
+                // No way to set config values for keys NOT declared in the 
policy,
+                // so must use displayName as a generally available config 
value.
+                if (target.equals(policy.getDisplayName()) || 
target.equals(policy.getClass().getName())) {
+                    return policy;
+                }
+            }
+        } else {
+            throw new IllegalStateException("Unexpected type " + 
target.getClass() + " for target " + target);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java 
b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
index acd371d..a92b8a8 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
@@ -18,35 +18,17 @@
  */
 package brooklyn.policy.ha;
 
-import static brooklyn.util.time.Time.makeTimeStringRounded;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import brooklyn.catalog.Catalog;
 import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.BrooklynTaskTags;
 import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.Sensor;
 import brooklyn.event.basic.BasicConfigKey;
 import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.management.Task;
-import brooklyn.policy.basic.AbstractPolicy;
 import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.guava.Maybe;
 import brooklyn.util.net.Networking;
-import brooklyn.util.task.BasicTask;
-import brooklyn.util.task.ScheduledTask;
 import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
 
 import com.google.common.net.HostAndPort;
 
@@ -56,24 +38,12 @@ import com.google.common.net.HostAndPort;
  */
 @Catalog(name="Connection Failure Detector", description="HA policy for 
monitoring a host:port, "
         + "emitting an event if the connection is lost/restored")
-public class ConnectionFailureDetector extends AbstractPolicy {
-
-    // TODO Remove duplication from ServiceFailureDetector, particularly for 
the stabilisation delays.
-
-    public enum LastPublished {
-        NONE,
-        FAILED,
-        RECOVERED;
-    }
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionFailureDetector.class);
-
-    private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
+public class ConnectionFailureDetector extends AbstractFailureDetector {
 
     public static final ConfigKey<HostAndPort> ENDPOINT = 
ConfigKeys.newConfigKey(HostAndPort.class, 
"connectionFailureDetector.endpoint");
-    
+
     public static final ConfigKey<Duration> POLL_PERIOD = 
ConfigKeys.newConfigKey(Duration.class, "connectionFailureDetector.pollPeriod", 
"", Duration.ONE_SECOND);
-    
+
     public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
 
     public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
@@ -95,245 +65,61 @@ public class ConnectionFailureDetector extends 
AbstractPolicy {
             .defaultValue(Duration.ZERO)
             .build();
 
-    protected final AtomicReference<Long> connectionLastUp = new 
AtomicReference<Long>();
-    protected final AtomicReference<Long> connectionLastDown = new 
AtomicReference<Long>();
-    
-    protected Long currentFailureStartTime = null;
-    protected Long currentRecoveryStartTime = null;
-
-    protected LastPublished lastPublished = LastPublished.NONE;
-
-    private final AtomicBoolean executorQueued = new AtomicBoolean(false);
-    private volatile long executorTime = 0;
-
-    private Callable<Task<?>> pollingTaskFactory;
-
-    private Task<?> scheduledTask;
-    
-    public ConnectionFailureDetector() {
-    }
-    
     @Override
     public void init() {
+        super.init();
         getRequiredConfig(ENDPOINT); // just to confirm it's set, failing fast
-
-        pollingTaskFactory = new Callable<Task<?>>() {
-            @Override public Task<?> call() {
-                BasicTask<Void> task = new BasicTask<Void>(new Runnable() {
-                    @Override public void run() {
-                        checkHealth();
-                    }});
-                BrooklynTaskTags.setTransient(task);
-                return task;
-            }
-        };
-    }
-    
-    @Override
-    public void setEntity(EntityLocal entity) {
-        super.setEntity(entity);
-
-        if (isRunning()) {
-            doStartPolling();
+        if (config().getRaw(SENSOR_FAILED).isAbsent()) {
+            config().set(SENSOR_FAILED, CONNECTION_FAILED);
+        }
+        if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
+            config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
         }
     }
 
     @Override
-    public void suspend() {
-        scheduledTask.cancel(true);
-        super.suspend();
+    protected CalculatedStatus calculateStatus() {
+        HostAndPort endpoint = getConfig(ENDPOINT);
+        boolean isHealthy = Networking.isReachable(endpoint);
+        return new BasicCalculatedStatus(isHealthy, "endpoint=" + endpoint);
     }
-    
+
+    //Persistence compatibility overrides
     @Override
-    public void resume() {
-        currentFailureStartTime = null;
-        currentRecoveryStartTime = null;
-        lastPublished = LastPublished.NONE;
-        executorQueued.set(false);
-        executorTime = 0;
-        
-        super.resume();
-        doStartPolling();
-    }
-    
-    protected void doStartPolling() {
-        if (scheduledTask == null || scheduledTask.isDone()) {
-            ScheduledTask task = new ScheduledTask(MutableMap.of("period", 
getConfig(POLL_PERIOD)), pollingTaskFactory);
-            scheduledTask = 
((EntityInternal)entity).getExecutionContext().submit(task);
-        }
+    protected Duration getPollPeriod() {
+        return getConfig(POLL_PERIOD);
     }
-    
-    private Duration getConnectionFailedStabilizationDelay() {
+
+    @Override
+    protected Duration getFailedStabilizationDelay() {
         return getConfig(CONNECTION_FAILED_STABILIZATION_DELAY);
     }
 
-    private Duration getConnectionRecoveredStabilizationDelay() {
+    @Override
+    protected Duration getRecoveredStabilizationDelay() {
         return getConfig(CONNECTION_RECOVERED_STABILIZATION_DELAY);
     }
 
-    private synchronized void checkHealth() {
-        CalculatedStatus status = calculateStatus();
-        boolean connected = status.connected;
-        long now = System.currentTimeMillis();
-        
-        if (connected) {
-            connectionLastUp.set(now);
-            if (lastPublished == LastPublished.FAILED) {
-                if (currentRecoveryStartTime == null) {
-                    LOG.info("{} connectivity-check for {}, now recovering: 
{}", new Object[] {this, entity, status.getDescription()});
-                    currentRecoveryStartTime = now;
-                    schedulePublish();
-                } else {
-                    if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check 
for {}, continuing recovering: {}", new Object[] {this, entity, 
status.getDescription()});
-                }
-            } else {
-                if (currentFailureStartTime != null) {
-                    LOG.info("{} connectivity-check for {}, now healthy: {}", 
new Object[] {this, entity, status.getDescription()});
-                    currentFailureStartTime = null;
-                } else {
-                    if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check 
for {}, still healthy: {}", new Object[] {this, entity, 
status.getDescription()});
-                }
-            }
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Sensor<FailureDescriptor> getSensorFailed() {
+        Maybe<Object> sensorFailed = config().getRaw(SENSOR_FAILED);
+        if (sensorFailed.isPresent()) {
+            return (Sensor<FailureDescriptor>)sensorFailed.get();
         } else {
-            connectionLastDown.set(now);
-            if (lastPublished != LastPublished.FAILED) {
-                if (currentFailureStartTime == null) {
-                    LOG.info("{} connectivity-check for {}, now failing: {}", 
new Object[] {this, entity, status.getDescription()});
-                    currentFailureStartTime = now;
-                    schedulePublish();
-                } else {
-                    if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check 
for {}, continuing failing: {}", new Object[] {this, entity, 
status.getDescription()});
-                }
-            } else {
-                if (currentRecoveryStartTime != null) {
-                    LOG.info("{} connectivity-check for {}, now failing: {}", 
new Object[] {this, entity, status.getDescription()});
-                    currentRecoveryStartTime = null;
-                } else {
-                    if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check 
for {}, still failed: {}", new Object[] {this, entity, 
status.getDescription()});
-                }
-            }
+            return CONNECTION_FAILED;
         }
     }
-    
-    protected CalculatedStatus calculateStatus() {
-        return new CalculatedStatus();
-    }
-
-    protected void schedulePublish() {
-        schedulePublish(0);
-    }
-    
-    protected void schedulePublish(long delay) {
-        if (isRunning() && executorQueued.compareAndSet(false, true)) {
-            long now = System.currentTimeMillis();
-            delay = Math.max(0, Math.max(delay, (executorTime + 
MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
-            if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in 
{}ms", this, delay);
-            
-            Runnable job = new Runnable() {
-                @Override public void run() {
-                    try {
-                        executorTime = System.currentTimeMillis();
-                        executorQueued.set(false);
 
-                        publishNow();
-                        
-                    } catch (Exception e) {
-                        if (isRunning()) {
-                            LOG.error("Problem resizing: "+e, e);
-                        } else {
-                            if (LOG.isDebugEnabled()) LOG.debug("Problem 
resizing, but no longer running: "+e, e);
-                        }
-                    } catch (Throwable t) {
-                        LOG.error("Problem in service-failure-detector: "+t, 
t);
-                        throw Exceptions.propagate(t);
-                    }
-                }
-            };
-            
-            ScheduledTask task = new ScheduledTask(MutableMap.of("delay", 
Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job));
-            ((EntityInternal)entity).getExecutionContext().submit(task);
-        }
-    }
-    
-    private synchronized void publishNow() {
-        if (!isRunning()) return;
-        
-        CalculatedStatus calculatedStatus = calculateStatus();
-        boolean connected = calculatedStatus.connected;
-        
-        Long lastUpTime = connectionLastUp.get();
-        Long lastDownTime = connectionLastDown.get();
-        long serviceFailedStabilizationDelay = 
getConnectionFailedStabilizationDelay().toMilliseconds();
-        long serviceRecoveredStabilizationDelay = 
getConnectionRecoveredStabilizationDelay().toMilliseconds();
-        long now = System.currentTimeMillis();
-        
-        if (connected) {
-            if (lastPublished == LastPublished.FAILED) {
-                // only publish if consistently up for 
serviceRecoveredStabilizationDelay
-                long currentRecoveryPeriod = getTimeDiff(now, 
currentRecoveryStartTime);
-                long sinceLastDownPeriod = getTimeDiff(now, lastDownTime);
-                if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay 
&& sinceLastDownPeriod > serviceRecoveredStabilizationDelay) {
-                    String description = calculatedStatus.getDescription();
-                    LOG.warn("{} connectivity-check for {}, publishing 
recovered: {}", new Object[] {this, entity, description});
-                    entity.emit(CONNECTION_RECOVERED, new 
HASensors.FailureDescriptor(entity, description));
-                    lastPublished = LastPublished.RECOVERED;
-                    currentFailureStartTime = null;
-                } else {
-                    long nextAttemptTime = 
Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, 
serviceRecoveredStabilizationDelay - sinceLastDownPeriod);
-                    schedulePublish(nextAttemptTime);
-                }
-            }
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Sensor<FailureDescriptor> getSensorRecovered() {
+        Maybe<Object> sensorRecovered = config().getRaw(SENSOR_RECOVERED);
+        if (sensorRecovered.isPresent()) {
+            return (Sensor<FailureDescriptor>)sensorRecovered.get();
         } else {
-            if (lastPublished != LastPublished.FAILED) {
-                // only publish if consistently down for 
serviceFailedStabilizationDelay
-                long currentFailurePeriod = getTimeDiff(now, 
currentFailureStartTime);
-                long sinceLastUpPeriod = getTimeDiff(now, lastUpTime);
-                if (currentFailurePeriod > serviceFailedStabilizationDelay && 
sinceLastUpPeriod > serviceFailedStabilizationDelay) {
-                    String description = calculatedStatus.getDescription();
-                    LOG.warn("{} connectivity-check for {}, publishing failed: 
{}", new Object[] {this, entity, description});
-                    entity.emit(CONNECTION_FAILED, new 
HASensors.FailureDescriptor(entity, description));
-                    lastPublished = LastPublished.FAILED;
-                    currentRecoveryStartTime = null;
-                } else {
-                    long nextAttemptTime = 
Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, 
serviceFailedStabilizationDelay - sinceLastUpPeriod);
-                    schedulePublish(nextAttemptTime);
-                }
-            }
+            return CONNECTION_RECOVERED;
         }
     }
 
-    public class CalculatedStatus {
-        public final boolean connected;
-        
-        public CalculatedStatus() {
-            HostAndPort endpoint = getConfig(ENDPOINT);
-            connected = Networking.isReachable(endpoint);
-        }
-        
-        public String getDescription() {
-            Long lastUpTime = connectionLastUp.get();
-            Long lastDownTime = connectionLastDown.get();
-            Duration serviceFailedStabilizationDelay = 
getConnectionFailedStabilizationDelay();
-            Duration serviceRecoveredStabilizationDelay = 
getConnectionRecoveredStabilizationDelay();
-
-            return String.format("endpoint=%s; connected=%s; timeNow=%s; 
lastUp=%s; lastDown=%s; lastPublished=%s; "+
-                        "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
-                    getConfig(ENDPOINT), 
-                    connected,
-                    Time.makeDateString(System.currentTimeMillis()),
-                    (lastUpTime != null ? Time.makeDateString(lastUpTime) : 
"<never>"),
-                    (lastDownTime != null ? Time.makeDateString(lastDownTime) 
: "<never>"),
-                    lastPublished,
-                    (currentFailureStartTime != null ? 
getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization 
"+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
-                    (currentRecoveryStartTime != null ? 
getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization 
"+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
-        }
-    }
-    
-    private long getTimeDiff(Long recent, Long previous) {
-        return (previous == null) ? recent : (recent - previous);
-    }
-    
-    private String getTimeStringSince(Long time) {
-        return time == null ? null : 
Time.makeTimeStringRounded(System.currentTimeMillis() - time);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java 
b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
new file mode 100644
index 0000000..435592e
--- /dev/null
+++ b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.policy.ha;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.event.basic.BasicNotificationSensor;
+import brooklyn.location.basic.Machines;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.policy.ha.AbstractFailureDetector.LastPublished;
+import brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.internal.ssh.SshTool;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+@Catalog(name="Ssh Connectivity Failure Detector", description="HA policy for 
monitoring an SshMachine, "
+        + "emitting an event if the connection is lost/restored")
+public class SshMachineFailureDetector extends AbstractFailureDetector {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SshMachineFailureDetector.class);
+
+    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
+
+    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
+
+    public static final ConfigKey<Duration> CONNECT_TIMEOUT = 
ConfigKeys.newDurationConfigKey(
+            "ha.sshConnection.timeout", "How long to wait for conneciton 
before declaring failure", Duration.TEN_SECONDS);
+
+    @Override
+    public void init() {
+        super.init();
+        if (config().getRaw(SENSOR_FAILED).isAbsent()) {
+            config().set(SENSOR_FAILED, CONNECTION_FAILED);
+        }
+        if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
+            config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
+        }
+        if (config().getRaw(POLL_PERIOD).isAbsent()) {
+            config().set(POLL_PERIOD, Duration.ONE_MINUTE);
+        }
+    }
+
+    @Override
+    protected CalculatedStatus calculateStatus() {
+        Maybe<SshMachineLocation> sshMachineOption = 
Machines.findUniqueSshMachineLocation(entity.getLocations());
+        if (sshMachineOption.isPresent()) {
+            SshMachineLocation sshMachine = sshMachineOption.get();
+            try {
+                Duration timeout = config().get(CONNECT_TIMEOUT);
+                Map<String, ?> flags = ImmutableMap.of(
+                        SshTool.PROP_CONNECT_TIMEOUT.getName(), 
timeout.toMilliseconds(),
+                        SshTool.PROP_SESSION_TIMEOUT.getName(), 
timeout.toMilliseconds(),
+                        SshTool.PROP_SSH_TRIES.getName(), 1);
+                int exitCode = sshMachine.execCommands(flags, 
SshMachineFailureDetector.class.getName(), ImmutableList.of("exit"));
+                return new BasicCalculatedStatus(exitCode == 0, 
sshMachine.toString());
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+                boolean isFirstFailure = lastPublished != LastPublished.FAILED 
&& currentFailureStartTime == null;
+                if (isFirstFailure) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Failed connecting to machine " + 
sshMachine, e);
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Failed connecting to machine " + 
sshMachine, e);
+                    }
+                }
+                return new BasicCalculatedStatus(false, e.getMessage());
+            }
+        } else {
+            return new BasicCalculatedStatus(true, "no machine started, not 
complaining");
+        }
+    }
+}

Reply via email to