Repository: incubator-brooklyn Updated Branches: refs/heads/master 6153b0a8f -> 04fc801d0
new policies: SshConnectionFailure, ConditionalSuspendPolicy * SshConnectionFailure emits CONNECTION_FAILURE if it can't make ssh connection to the machine of the entity * ConditionalSuspendPolicy suspends a target policy if it receives a sensor event (CONNECTION_FAILURE by default) Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/dd0c2348 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/dd0c2348 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/dd0c2348 Branch: refs/heads/master Commit: dd0c2348a2677526f29866b72aaa8cbe351c503a Parents: 0ff4216 Author: Svetoslav Neykov <[email protected]> Authored: Tue Mar 24 15:17:10 2015 +0200 Committer: Svetoslav Neykov <[email protected]> Committed: Tue Apr 7 12:13:40 2015 +0300 ---------------------------------------------------------------------- .../policy/ha/AbstractFailureDetector.java | 359 +++++++++++++++++++ .../policy/ha/ConditionalSuspendPolicy.java | 101 ++++++ .../policy/ha/ConnectionFailureDetector.java | 290 ++------------- .../policy/ha/SshMachineFailureDetector.java | 99 +++++ 4 files changed, 597 insertions(+), 252 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java new file mode 100644 index 0000000..6a1324c --- /dev/null +++ b/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.policy.ha; + +import static brooklyn.util.time.Time.makeTimeStringRounded; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.event.Sensor; +import brooklyn.management.Task; +import brooklyn.policy.basic.AbstractPolicy; +import brooklyn.policy.ha.HASensors.FailureDescriptor; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.flags.SetFromFlag; +import brooklyn.util.task.BasicTask; +import brooklyn.util.task.ScheduledTask; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.reflect.TypeToken; + +public abstract class AbstractFailureDetector extends AbstractPolicy { + + // TODO Remove duplication from ServiceFailureDetector, particularly for the stabilisation delays. + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFailureDetector.class); + + private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100; + + public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newDurationConfigKey( + "failureDetector.pollPeriod", "", Duration.ONE_SECOND); + + @SetFromFlag("failedStabilizationDelay") + public static final ConfigKey<Duration> FAILED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey( + "failureDetector.serviceFailedStabilizationDelay", + "Time period for which the health check consistently fails " + + "(e.g. doesn't report failed-ok-faled) before concluding failure.", + Duration.ZERO); + + @SetFromFlag("recoveredStabilizationDelay") + public static final ConfigKey<Duration> RECOVERED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey( + "failureDetector.serviceRecoveredStabilizationDelay", + "Time period for which the health check succeeds continiually " + + "(e.g. doesn't report ok-failed-ok) before concluding recovered", + Duration.ZERO); + + @SuppressWarnings("serial") + public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_FAILED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {}, + "failureDetector.sensor.fail", "A sensor which will indicate failure when set", HASensors.ENTITY_FAILED); + + @SuppressWarnings("serial") + public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_RECOVERED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {}, + "failureDetector.sensor.recover", "A sensor which will indicate recovery from failure when set", HASensors.ENTITY_RECOVERED); + + public interface CalculatedStatus { + boolean isHealthy(); + String getDescription(); + } + + private final class PublishJob implements Runnable { + @Override public void run() { + try { + executorTime = System.currentTimeMillis(); + executorQueued.set(false); + + publishNow(); + + } catch (Exception e) { + if (isRunning()) { + LOG.error("Problem resizing: "+e, e); + } else { + if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but no longer running: "+e, e); + } + } catch (Throwable t) { + LOG.error("Problem in service-failure-detector: "+t, t); + throw Exceptions.propagate(t); + } + } + } + + private final class HealthPoller implements Runnable { + @Override + public void run() { + checkHealth(); + } + } + + private final class HealthPollingTaskFactory implements Callable<Task<?>> { + @Override + public Task<?> call() { + BasicTask<Void> task = new BasicTask<Void>(new HealthPoller()); + BrooklynTaskTags.setTransient(task); + return task; + } + } + + protected static class BasicCalculatedStatus implements CalculatedStatus { + private boolean healthy; + private String description; + + public BasicCalculatedStatus(boolean healthy, String description) { + this.healthy = healthy; + this.description = description; + } + + @Override + public boolean isHealthy() { + return healthy; + } + + @Override + public String getDescription() { + return description; + } + } + + public enum LastPublished { + NONE, + FAILED, + RECOVERED; + } + + protected final AtomicReference<Long> stateLastGood = new AtomicReference<Long>(); + protected final AtomicReference<Long> stateLastFail = new AtomicReference<Long>(); + + protected Long currentFailureStartTime = null; + protected Long currentRecoveryStartTime = null; + + protected LastPublished lastPublished = LastPublished.NONE; + + private final AtomicBoolean executorQueued = new AtomicBoolean(false); + private volatile long executorTime = 0; + + private Callable<Task<?>> pollingTaskFactory = new HealthPollingTaskFactory(); + + private Task<?> scheduledTask; + + protected abstract CalculatedStatus calculateStatus(); + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + + if (isRunning()) { + doStartPolling(); + } + } + + @Override + public void suspend() { + scheduledTask.cancel(true); + super.suspend(); + } + + @Override + public void resume() { + currentFailureStartTime = null; + currentRecoveryStartTime = null; + lastPublished = LastPublished.NONE; + executorQueued.set(false); + executorTime = 0; + + super.resume(); + doStartPolling(); + } + + protected void doStartPolling() { + if (scheduledTask == null || scheduledTask.isDone()) { + ScheduledTask task = new ScheduledTask(MutableMap.of("period", getPollPeriod(), "displayName", getTaskName()), pollingTaskFactory); + scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task);; + } + } + + private String getTaskName() { + return getDisplayName(); + } + + protected Duration getPollPeriod() { + return getConfig(POLL_PERIOD); + } + + protected Duration getFailedStabilizationDelay() { + return getConfig(FAILED_STABILIZATION_DELAY); + } + + protected Duration getRecoveredStabilizationDelay() { + return getConfig(RECOVERED_STABILIZATION_DELAY); + } + + protected Sensor<FailureDescriptor> getSensorFailed() { + return getConfig(SENSOR_FAILED); + } + + protected Sensor<FailureDescriptor> getSensorRecovered() { + return getConfig(SENSOR_RECOVERED); + } + + private synchronized void checkHealth() { + CalculatedStatus status = calculateStatus(); + boolean healthy = status.isHealthy(); + long now = System.currentTimeMillis(); + + if (healthy) { + stateLastGood.set(now); + if (lastPublished == LastPublished.FAILED) { + if (currentRecoveryStartTime == null) { + LOG.info("{} check for {}, now recovering: {}", new Object[] {this, entity, getDescription(status)}); + currentRecoveryStartTime = now; + schedulePublish(); + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing recovering: {}", new Object[] {this, entity, getDescription(status)}); + } + } else { + if (currentFailureStartTime != null) { + LOG.info("{} check for {}, now healthy: {}", new Object[] {this, entity, getDescription(status)}); + currentFailureStartTime = null; + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still healthy: {}", new Object[] {this, entity, getDescription(status)}); + } + } + } else { + stateLastFail.set(now); + if (lastPublished != LastPublished.FAILED) { + if (currentFailureStartTime == null) { + LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)}); + currentFailureStartTime = now; + schedulePublish(); + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing failing: {}", new Object[] {this, entity, getDescription(status)}); + } + } else { + if (currentRecoveryStartTime != null) { + LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)}); + currentRecoveryStartTime = null; + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still failed: {}", new Object[] {this, entity, getDescription(status)}); + } + } + } + } + + protected void schedulePublish() { + schedulePublish(0); + } + + protected void schedulePublish(long delay) { + if (isRunning() && executorQueued.compareAndSet(false, true)) { + long now = System.currentTimeMillis(); + delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now)); + if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay); + + Runnable job = new PublishJob(); + + ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask<Void>(job)); + ((EntityInternal)entity).getExecutionContext().submit(task); + } + } + + private synchronized void publishNow() { + if (!isRunning()) return; + + CalculatedStatus calculatedStatus = calculateStatus(); + boolean healthy = calculatedStatus.isHealthy(); + + Long lastUpTime = stateLastGood.get(); + Long lastDownTime = stateLastFail.get(); + long serviceFailedStabilizationDelay = getFailedStabilizationDelay().toMilliseconds(); + long serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay().toMilliseconds(); + long now = System.currentTimeMillis(); + + if (healthy) { + if (lastPublished == LastPublished.FAILED) { + // only publish if consistently up for serviceRecoveredStabilizationDelay + long currentRecoveryPeriod = getTimeDiff(now, currentRecoveryStartTime); + long sinceLastDownPeriod = getTimeDiff(now, lastDownTime); + if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay && sinceLastDownPeriod > serviceRecoveredStabilizationDelay) { + String description = getDescription(calculatedStatus); + LOG.warn("{} check for {}, publishing recovered: {}", new Object[] {this, entity, description}); + entity.emit(getSensorRecovered(), new HASensors.FailureDescriptor(entity, description)); + lastPublished = LastPublished.RECOVERED; + currentFailureStartTime = null; + } else { + long nextAttemptTime = Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, serviceRecoveredStabilizationDelay - sinceLastDownPeriod); + schedulePublish(nextAttemptTime); + } + } + } else { + if (lastPublished != LastPublished.FAILED) { + // only publish if consistently down for serviceFailedStabilizationDelay + long currentFailurePeriod = getTimeDiff(now, currentFailureStartTime); + long sinceLastUpPeriod = getTimeDiff(now, lastUpTime); + if (currentFailurePeriod > serviceFailedStabilizationDelay && sinceLastUpPeriod > serviceFailedStabilizationDelay) { + String description = getDescription(calculatedStatus); + LOG.warn("{} connectivity-check for {}, publishing failed: {}", new Object[] {this, entity, description}); + entity.emit(getSensorFailed(), new HASensors.FailureDescriptor(entity, description)); + lastPublished = LastPublished.FAILED; + currentRecoveryStartTime = null; + } else { + long nextAttemptTime = Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, serviceFailedStabilizationDelay - sinceLastUpPeriod); + schedulePublish(nextAttemptTime); + } + } + } + } + + protected String getDescription(CalculatedStatus status) { + Long lastUpTime = stateLastGood.get(); + Long lastDownTime = stateLastGood.get(); + Duration serviceFailedStabilizationDelay = getFailedStabilizationDelay(); + Duration serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay(); + + return String.format("%s; healthy=%s; timeNow=%s; lastUp=%s; lastDown=%s; lastPublished=%s; "+ + "currentFailurePeriod=%s; currentRecoveryPeriod=%s", + status.getDescription(), + status.isHealthy(), + Time.makeDateString(System.currentTimeMillis()), + (lastUpTime != null ? Time.makeDateString(lastUpTime) : "<never>"), + (lastDownTime != null ? Time.makeDateString(lastDownTime) : "<never>"), + lastPublished, + (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")", + (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")"); + } + + private long getTimeDiff(Long recent, Long previous) { + return (previous == null) ? recent : (recent - previous); + } + + private String getTimeStringSince(Long time) { + return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java new file mode 100644 index 0000000..6e37ef9 --- /dev/null +++ b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.policy.ha; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.event.Sensor; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.policy.Policy; +import brooklyn.policy.basic.AbstractPolicy; +import brooklyn.util.flags.SetFromFlag; + +import com.google.common.base.Preconditions; + +public class ConditionalSuspendPolicy extends AbstractPolicy { + private static final Logger LOG = LoggerFactory.getLogger(ConditionalSuspendPolicy.class); + + @SetFromFlag("suppressSensor") + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static final ConfigKey<Sensor<?>> SUSPEND_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class, + "suppressSensor", "Sensor which will suppress the target policy", HASensors.CONNECTION_FAILED); + + @SetFromFlag("resetSensor") + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static final ConfigKey<Sensor<?>> RESUME_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class, + "resetSensor", "Resume target policy when this sensor is observed", HASensors.CONNECTION_RECOVERED); + + @SetFromFlag("target") + public static final ConfigKey<Object> SUSPEND_TARGET = ConfigKeys.newConfigKey(Object.class, + "target", "The target policy to suspend. Either direct reference or the value of the suspendTarget config on a policy from the same entity."); + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + Object target = config().get(SUSPEND_TARGET); + Preconditions.checkNotNull(target, "Suspend target required"); + Preconditions.checkNotNull(getTargetPolicy(), "Can't find target policy set in " + SUSPEND_TARGET.getName() + ": " + target); + subscribe(); + } + + private void subscribe() { + subscribe(entity, getConfig(SUSPEND_SENSOR), new SensorEventListener<Object>() { + @Override public void onEvent(final SensorEvent<Object> event) { + if (isRunning()) { + Policy target = getTargetPolicy(); + target.suspend(); + LOG.debug("Suspended policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue()); + } + } + + }); + subscribe(entity, getConfig(RESUME_SENSOR), new SensorEventListener<Object>() { + @Override public void onEvent(final SensorEvent<Object> event) { + if (isRunning()) { + Policy target = getTargetPolicy(); + target.resume(); + LOG.debug("Resumed policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue()); + } + } + }); + } + + private Policy getTargetPolicy() { + Object target = config().get(SUSPEND_TARGET); + if (target instanceof Policy) { + return (Policy)target; + } else if (target instanceof String) { + for (Policy policy : entity.getPolicies()) { + // No way to set config values for keys NOT declared in the policy, + // so must use displayName as a generally available config value. + if (target.equals(policy.getDisplayName()) || target.equals(policy.getClass().getName())) { + return policy; + } + } + } else { + throw new IllegalStateException("Unexpected type " + target.getClass() + " for target " + target); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java index acd371d..a92b8a8 100644 --- a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java +++ b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java @@ -18,35 +18,17 @@ */ package brooklyn.policy.ha; -import static brooklyn.util.time.Time.makeTimeStringRounded; - -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import brooklyn.catalog.Catalog; import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.BrooklynTaskTags; import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.basic.EntityLocal; +import brooklyn.event.Sensor; import brooklyn.event.basic.BasicConfigKey; import brooklyn.event.basic.BasicNotificationSensor; -import brooklyn.management.Task; -import brooklyn.policy.basic.AbstractPolicy; import brooklyn.policy.ha.HASensors.FailureDescriptor; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; import brooklyn.util.flags.SetFromFlag; +import brooklyn.util.guava.Maybe; import brooklyn.util.net.Networking; -import brooklyn.util.task.BasicTask; -import brooklyn.util.task.ScheduledTask; import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; import com.google.common.net.HostAndPort; @@ -56,24 +38,12 @@ import com.google.common.net.HostAndPort; */ @Catalog(name="Connection Failure Detector", description="HA policy for monitoring a host:port, " + "emitting an event if the connection is lost/restored") -public class ConnectionFailureDetector extends AbstractPolicy { - - // TODO Remove duplication from ServiceFailureDetector, particularly for the stabilisation delays. - - public enum LastPublished { - NONE, - FAILED, - RECOVERED; - } - - private static final Logger LOG = LoggerFactory.getLogger(ConnectionFailureDetector.class); - - private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100; +public class ConnectionFailureDetector extends AbstractFailureDetector { public static final ConfigKey<HostAndPort> ENDPOINT = ConfigKeys.newConfigKey(HostAndPort.class, "connectionFailureDetector.endpoint"); - + public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(Duration.class, "connectionFailureDetector.pollPeriod", "", Duration.ONE_SECOND); - + public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED; public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED; @@ -95,245 +65,61 @@ public class ConnectionFailureDetector extends AbstractPolicy { .defaultValue(Duration.ZERO) .build(); - protected final AtomicReference<Long> connectionLastUp = new AtomicReference<Long>(); - protected final AtomicReference<Long> connectionLastDown = new AtomicReference<Long>(); - - protected Long currentFailureStartTime = null; - protected Long currentRecoveryStartTime = null; - - protected LastPublished lastPublished = LastPublished.NONE; - - private final AtomicBoolean executorQueued = new AtomicBoolean(false); - private volatile long executorTime = 0; - - private Callable<Task<?>> pollingTaskFactory; - - private Task<?> scheduledTask; - - public ConnectionFailureDetector() { - } - @Override public void init() { + super.init(); getRequiredConfig(ENDPOINT); // just to confirm it's set, failing fast - - pollingTaskFactory = new Callable<Task<?>>() { - @Override public Task<?> call() { - BasicTask<Void> task = new BasicTask<Void>(new Runnable() { - @Override public void run() { - checkHealth(); - }}); - BrooklynTaskTags.setTransient(task); - return task; - } - }; - } - - @Override - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - - if (isRunning()) { - doStartPolling(); + if (config().getRaw(SENSOR_FAILED).isAbsent()) { + config().set(SENSOR_FAILED, CONNECTION_FAILED); + } + if (config().getRaw(SENSOR_RECOVERED).isAbsent()) { + config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED); } } @Override - public void suspend() { - scheduledTask.cancel(true); - super.suspend(); + protected CalculatedStatus calculateStatus() { + HostAndPort endpoint = getConfig(ENDPOINT); + boolean isHealthy = Networking.isReachable(endpoint); + return new BasicCalculatedStatus(isHealthy, "endpoint=" + endpoint); } - + + //Persistence compatibility overrides @Override - public void resume() { - currentFailureStartTime = null; - currentRecoveryStartTime = null; - lastPublished = LastPublished.NONE; - executorQueued.set(false); - executorTime = 0; - - super.resume(); - doStartPolling(); - } - - protected void doStartPolling() { - if (scheduledTask == null || scheduledTask.isDone()) { - ScheduledTask task = new ScheduledTask(MutableMap.of("period", getConfig(POLL_PERIOD)), pollingTaskFactory); - scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task); - } + protected Duration getPollPeriod() { + return getConfig(POLL_PERIOD); } - - private Duration getConnectionFailedStabilizationDelay() { + + @Override + protected Duration getFailedStabilizationDelay() { return getConfig(CONNECTION_FAILED_STABILIZATION_DELAY); } - private Duration getConnectionRecoveredStabilizationDelay() { + @Override + protected Duration getRecoveredStabilizationDelay() { return getConfig(CONNECTION_RECOVERED_STABILIZATION_DELAY); } - private synchronized void checkHealth() { - CalculatedStatus status = calculateStatus(); - boolean connected = status.connected; - long now = System.currentTimeMillis(); - - if (connected) { - connectionLastUp.set(now); - if (lastPublished == LastPublished.FAILED) { - if (currentRecoveryStartTime == null) { - LOG.info("{} connectivity-check for {}, now recovering: {}", new Object[] {this, entity, status.getDescription()}); - currentRecoveryStartTime = now; - schedulePublish(); - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, continuing recovering: {}", new Object[] {this, entity, status.getDescription()}); - } - } else { - if (currentFailureStartTime != null) { - LOG.info("{} connectivity-check for {}, now healthy: {}", new Object[] {this, entity, status.getDescription()}); - currentFailureStartTime = null; - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, still healthy: {}", new Object[] {this, entity, status.getDescription()}); - } - } + @SuppressWarnings("unchecked") + @Override + protected Sensor<FailureDescriptor> getSensorFailed() { + Maybe<Object> sensorFailed = config().getRaw(SENSOR_FAILED); + if (sensorFailed.isPresent()) { + return (Sensor<FailureDescriptor>)sensorFailed.get(); } else { - connectionLastDown.set(now); - if (lastPublished != LastPublished.FAILED) { - if (currentFailureStartTime == null) { - LOG.info("{} connectivity-check for {}, now failing: {}", new Object[] {this, entity, status.getDescription()}); - currentFailureStartTime = now; - schedulePublish(); - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, continuing failing: {}", new Object[] {this, entity, status.getDescription()}); - } - } else { - if (currentRecoveryStartTime != null) { - LOG.info("{} connectivity-check for {}, now failing: {}", new Object[] {this, entity, status.getDescription()}); - currentRecoveryStartTime = null; - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, still failed: {}", new Object[] {this, entity, status.getDescription()}); - } - } + return CONNECTION_FAILED; } } - - protected CalculatedStatus calculateStatus() { - return new CalculatedStatus(); - } - - protected void schedulePublish() { - schedulePublish(0); - } - - protected void schedulePublish(long delay) { - if (isRunning() && executorQueued.compareAndSet(false, true)) { - long now = System.currentTimeMillis(); - delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now)); - if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay); - - Runnable job = new Runnable() { - @Override public void run() { - try { - executorTime = System.currentTimeMillis(); - executorQueued.set(false); - publishNow(); - - } catch (Exception e) { - if (isRunning()) { - LOG.error("Problem resizing: "+e, e); - } else { - if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but no longer running: "+e, e); - } - } catch (Throwable t) { - LOG.error("Problem in service-failure-detector: "+t, t); - throw Exceptions.propagate(t); - } - } - }; - - ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job)); - ((EntityInternal)entity).getExecutionContext().submit(task); - } - } - - private synchronized void publishNow() { - if (!isRunning()) return; - - CalculatedStatus calculatedStatus = calculateStatus(); - boolean connected = calculatedStatus.connected; - - Long lastUpTime = connectionLastUp.get(); - Long lastDownTime = connectionLastDown.get(); - long serviceFailedStabilizationDelay = getConnectionFailedStabilizationDelay().toMilliseconds(); - long serviceRecoveredStabilizationDelay = getConnectionRecoveredStabilizationDelay().toMilliseconds(); - long now = System.currentTimeMillis(); - - if (connected) { - if (lastPublished == LastPublished.FAILED) { - // only publish if consistently up for serviceRecoveredStabilizationDelay - long currentRecoveryPeriod = getTimeDiff(now, currentRecoveryStartTime); - long sinceLastDownPeriod = getTimeDiff(now, lastDownTime); - if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay && sinceLastDownPeriod > serviceRecoveredStabilizationDelay) { - String description = calculatedStatus.getDescription(); - LOG.warn("{} connectivity-check for {}, publishing recovered: {}", new Object[] {this, entity, description}); - entity.emit(CONNECTION_RECOVERED, new HASensors.FailureDescriptor(entity, description)); - lastPublished = LastPublished.RECOVERED; - currentFailureStartTime = null; - } else { - long nextAttemptTime = Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, serviceRecoveredStabilizationDelay - sinceLastDownPeriod); - schedulePublish(nextAttemptTime); - } - } + @SuppressWarnings("unchecked") + @Override + protected Sensor<FailureDescriptor> getSensorRecovered() { + Maybe<Object> sensorRecovered = config().getRaw(SENSOR_RECOVERED); + if (sensorRecovered.isPresent()) { + return (Sensor<FailureDescriptor>)sensorRecovered.get(); } else { - if (lastPublished != LastPublished.FAILED) { - // only publish if consistently down for serviceFailedStabilizationDelay - long currentFailurePeriod = getTimeDiff(now, currentFailureStartTime); - long sinceLastUpPeriod = getTimeDiff(now, lastUpTime); - if (currentFailurePeriod > serviceFailedStabilizationDelay && sinceLastUpPeriod > serviceFailedStabilizationDelay) { - String description = calculatedStatus.getDescription(); - LOG.warn("{} connectivity-check for {}, publishing failed: {}", new Object[] {this, entity, description}); - entity.emit(CONNECTION_FAILED, new HASensors.FailureDescriptor(entity, description)); - lastPublished = LastPublished.FAILED; - currentRecoveryStartTime = null; - } else { - long nextAttemptTime = Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, serviceFailedStabilizationDelay - sinceLastUpPeriod); - schedulePublish(nextAttemptTime); - } - } + return CONNECTION_RECOVERED; } } - public class CalculatedStatus { - public final boolean connected; - - public CalculatedStatus() { - HostAndPort endpoint = getConfig(ENDPOINT); - connected = Networking.isReachable(endpoint); - } - - public String getDescription() { - Long lastUpTime = connectionLastUp.get(); - Long lastDownTime = connectionLastDown.get(); - Duration serviceFailedStabilizationDelay = getConnectionFailedStabilizationDelay(); - Duration serviceRecoveredStabilizationDelay = getConnectionRecoveredStabilizationDelay(); - - return String.format("endpoint=%s; connected=%s; timeNow=%s; lastUp=%s; lastDown=%s; lastPublished=%s; "+ - "currentFailurePeriod=%s; currentRecoveryPeriod=%s", - getConfig(ENDPOINT), - connected, - Time.makeDateString(System.currentTimeMillis()), - (lastUpTime != null ? Time.makeDateString(lastUpTime) : "<never>"), - (lastDownTime != null ? Time.makeDateString(lastDownTime) : "<never>"), - lastPublished, - (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")", - (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")"); - } - } - - private long getTimeDiff(Long recent, Long previous) { - return (previous == null) ? recent : (recent - previous); - } - - private String getTimeStringSince(Long time) { - return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time); - } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java new file mode 100644 index 0000000..435592e --- /dev/null +++ b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.policy.ha; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.event.basic.BasicNotificationSensor; +import brooklyn.location.basic.Machines; +import brooklyn.location.basic.SshMachineLocation; +import brooklyn.policy.ha.AbstractFailureDetector.LastPublished; +import brooklyn.policy.ha.HASensors.FailureDescriptor; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Maybe; +import brooklyn.util.internal.ssh.SshTool; +import brooklyn.util.time.Duration; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +@Catalog(name="Ssh Connectivity Failure Detector", description="HA policy for monitoring an SshMachine, " + + "emitting an event if the connection is lost/restored") +public class SshMachineFailureDetector extends AbstractFailureDetector { + private static final Logger LOG = LoggerFactory.getLogger(SshMachineFailureDetector.class); + + public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED; + + public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED; + + public static final ConfigKey<Duration> CONNECT_TIMEOUT = ConfigKeys.newDurationConfigKey( + "ha.sshConnection.timeout", "How long to wait for conneciton before declaring failure", Duration.TEN_SECONDS); + + @Override + public void init() { + super.init(); + if (config().getRaw(SENSOR_FAILED).isAbsent()) { + config().set(SENSOR_FAILED, CONNECTION_FAILED); + } + if (config().getRaw(SENSOR_RECOVERED).isAbsent()) { + config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED); + } + if (config().getRaw(POLL_PERIOD).isAbsent()) { + config().set(POLL_PERIOD, Duration.ONE_MINUTE); + } + } + + @Override + protected CalculatedStatus calculateStatus() { + Maybe<SshMachineLocation> sshMachineOption = Machines.findUniqueSshMachineLocation(entity.getLocations()); + if (sshMachineOption.isPresent()) { + SshMachineLocation sshMachine = sshMachineOption.get(); + try { + Duration timeout = config().get(CONNECT_TIMEOUT); + Map<String, ?> flags = ImmutableMap.of( + SshTool.PROP_CONNECT_TIMEOUT.getName(), timeout.toMilliseconds(), + SshTool.PROP_SESSION_TIMEOUT.getName(), timeout.toMilliseconds(), + SshTool.PROP_SSH_TRIES.getName(), 1); + int exitCode = sshMachine.execCommands(flags, SshMachineFailureDetector.class.getName(), ImmutableList.of("exit")); + return new BasicCalculatedStatus(exitCode == 0, sshMachine.toString()); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + boolean isFirstFailure = lastPublished != LastPublished.FAILED && currentFailureStartTime == null; + if (isFirstFailure) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed connecting to machine " + sshMachine, e); + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Failed connecting to machine " + sshMachine, e); + } + } + return new BasicCalculatedStatus(false, e.getMessage()); + } + } else { + return new BasicCalculatedStatus(true, "no machine started, not complaining"); + } + } +}
