http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java deleted file mode 100644 index ff5d60e..0000000 --- a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.ha; - -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.event.Sensor; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.event.basic.BasicConfigKey; -import brooklyn.event.basic.BasicNotificationSensor; -import brooklyn.policy.ha.HASensors.FailureDescriptor; -import brooklyn.util.guava.Maybe; -import brooklyn.util.net.Networking; -import brooklyn.util.time.Duration; - -import com.google.common.net.HostAndPort; - -/** - * Monitors a given {@link HostAndPort}, to emit HASensors.CONNECTION_FAILED and HASensors.CONNECTION_RECOVERED - * if the connection is lost/restored. - */ -@Catalog(name="Connection Failure Detector", description="HA policy for monitoring a host:port, " - + "emitting an event if the connection is lost/restored") -public class ConnectionFailureDetector extends AbstractFailureDetector { - - public static final ConfigKey<HostAndPort> ENDPOINT = ConfigKeys.newConfigKey(HostAndPort.class, "connectionFailureDetector.endpoint"); - - public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(Duration.class, "connectionFailureDetector.pollPeriod", "", Duration.ONE_SECOND); - - public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED; - - public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED; - - @SetFromFlag("connectionFailedStabilizationDelay") - public static final ConfigKey<Duration> CONNECTION_FAILED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class) - .name("connectionFailureDetector.serviceFailedStabilizationDelay") - .description("Time period for which the connection must be consistently down for " - + "(e.g. doesn't report down-up-down) before concluding failure. " - + "Note that long TCP timeouts mean there can be long (e.g. 70 second) " - + "delays in noticing a connection refused condition.") - .defaultValue(Duration.ZERO) - .build(); - - @SetFromFlag("connectionRecoveredStabilizationDelay") - public static final ConfigKey<Duration> CONNECTION_RECOVERED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class) - .name("connectionFailureDetector.serviceRecoveredStabilizationDelay") - .description("For a failed connection, time period for which the connection must be consistently up for (e.g. doesn't report up-down-up) before concluding recovered") - .defaultValue(Duration.ZERO) - .build(); - - @Override - public void init() { - super.init(); - getRequiredConfig(ENDPOINT); // just to confirm it's set, failing fast - if (config().getRaw(SENSOR_FAILED).isAbsent()) { - config().set(SENSOR_FAILED, CONNECTION_FAILED); - } - if (config().getRaw(SENSOR_RECOVERED).isAbsent()) { - config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED); - } - } - - @Override - protected CalculatedStatus calculateStatus() { - HostAndPort endpoint = getConfig(ENDPOINT); - boolean isHealthy = Networking.isReachable(endpoint); - return new BasicCalculatedStatus(isHealthy, "endpoint=" + endpoint); - } - - //Persistence compatibility overrides - @Override - protected Duration getPollPeriod() { - return getConfig(POLL_PERIOD); - } - - @Override - protected Duration getFailedStabilizationDelay() { - return getConfig(CONNECTION_FAILED_STABILIZATION_DELAY); - } - - @Override - protected Duration getRecoveredStabilizationDelay() { - return getConfig(CONNECTION_RECOVERED_STABILIZATION_DELAY); - } - - @SuppressWarnings("unchecked") - @Override - protected Sensor<FailureDescriptor> getSensorFailed() { - Maybe<Object> sensorFailed = config().getRaw(SENSOR_FAILED); - if (sensorFailed.isPresent()) { - return (Sensor<FailureDescriptor>)sensorFailed.get(); - } else { - return CONNECTION_FAILED; - } - } - - @SuppressWarnings("unchecked") - @Override - protected Sensor<FailureDescriptor> getSensorRecovered() { - Maybe<Object> sensorRecovered = config().getRaw(SENSOR_RECOVERED); - if (sensorRecovered.isPresent()) { - return (Sensor<FailureDescriptor>)sensorRecovered.get(); - } else { - return CONNECTION_RECOVERED; - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/HASensors.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/HASensors.java b/policy/src/main/java/brooklyn/policy/ha/HASensors.java deleted file mode 100644 index b940aa0..0000000 --- a/policy/src/main/java/brooklyn/policy/ha/HASensors.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.ha; - -import brooklyn.event.basic.BasicNotificationSensor; - -import com.google.common.base.Objects; - -public class HASensors { - - public static final BasicNotificationSensor<FailureDescriptor> ENTITY_FAILED = new BasicNotificationSensor<FailureDescriptor>( - FailureDescriptor.class, "ha.entityFailed", "Indicates that an entity has failed"); - - public static final BasicNotificationSensor<FailureDescriptor> ENTITY_RECOVERED = new BasicNotificationSensor<FailureDescriptor>( - FailureDescriptor.class, "ha.entityRecovered", "Indicates that a previously failed entity has recovered"); - - public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = new BasicNotificationSensor<FailureDescriptor>( - FailureDescriptor.class, "ha.connectionFailed", "Indicates that a connection has failed"); - - public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = new BasicNotificationSensor<FailureDescriptor>( - FailureDescriptor.class, "ha.connectionRecovered", "Indicates that a previously failed connection has recovered"); - - // TODO How to make this serializable with the entity reference - public static class FailureDescriptor { - private final Object component; - private final String description; - - public FailureDescriptor(Object component, String description) { - this.component = component; - this.description = description; - } - - public Object getComponent() { - return component; - } - - public String getDescription() { - return description; - } - - @Override - public String toString() { - return Objects.toStringHelper(this).add("component", component).add("description", description).toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java deleted file mode 100644 index 2e4b719..0000000 --- a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.ha; - -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.brooklyn.api.event.SensorEvent; -import org.apache.brooklyn.core.util.config.ConfigBag; -import org.apache.brooklyn.core.util.flags.SetFromFlag; -import org.apache.brooklyn.core.util.task.BasicTask; -import org.apache.brooklyn.core.util.task.ScheduledTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.basic.Lifecycle; -import brooklyn.entity.basic.ServiceStateLogic; -import brooklyn.entity.basic.ServiceStateLogic.ComputeServiceState; -import brooklyn.event.basic.BasicConfigKey; -import brooklyn.event.basic.BasicNotificationSensor; -import brooklyn.policy.ha.HASensors.FailureDescriptor; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -/** - * Emits {@link HASensors#ENTITY_FAILED} whenever the parent's default logic ({@link ComputeServiceState}) would detect a problem, - * and similarly {@link HASensors#ENTITY_RECOVERED} when recovered. - * <p> - * gives more control over suppressing {@link Lifecycle#ON_FIRE}, - * for some period of time - * (or until another process manually sets {@link Attributes#SERVICE_STATE_ACTUAL} to {@value Lifecycle#ON_FIRE}, - * which this enricher will not clear until all problems have gone away) - */ -//@Catalog(name="Service Failure Detector", description="HA policy for deteting failure of a service") -public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceState { - - // TODO Remove duplication between this and MemberFailureDetectionPolicy. - // The latter could be re-written to use this. Or could even be deprecated - // in favour of this. - - public enum LastPublished { - NONE, - FAILED, - RECOVERED; - } - - private static final Logger LOG = LoggerFactory.getLogger(ServiceFailureDetector.class); - - private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100; - - public static final BasicNotificationSensor<FailureDescriptor> ENTITY_FAILED = HASensors.ENTITY_FAILED; - - @SetFromFlag("onlyReportIfPreviouslyUp") - public static final ConfigKey<Boolean> ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP = ConfigKeys.newBooleanConfigKey("onlyReportIfPreviouslyUp", - "Prevents the policy from emitting ENTITY_FAILED if the entity fails on startup (ie has never been up)", true); - - public static final ConfigKey<Boolean> MONITOR_SERVICE_PROBLEMS = ConfigKeys.newBooleanConfigKey("monitorServiceProblems", - "Whether to monitor service problems, and emit on failures there (if set to false, this monitors only service up)", true); - - @SetFromFlag("serviceOnFireStabilizationDelay") - public static final ConfigKey<Duration> SERVICE_ON_FIRE_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class) - .name("serviceOnFire.stabilizationDelay") - .description("Time period for which the service must be consistently down for (e.g. doesn't report down-up-down) before concluding ON_FIRE") - .defaultValue(Duration.ZERO) - .build(); - - @SetFromFlag("entityFailedStabilizationDelay") - public static final ConfigKey<Duration> ENTITY_FAILED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class) - .name("entityFailed.stabilizationDelay") - .description("Time period for which the service must be consistently down for (e.g. doesn't report down-up-down) before emitting ENTITY_FAILED") - .defaultValue(Duration.ZERO) - .build(); - - @SetFromFlag("entityRecoveredStabilizationDelay") - public static final ConfigKey<Duration> ENTITY_RECOVERED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class) - .name("entityRecovered.stabilizationDelay") - .description("For a failed entity, time period for which the service must be consistently up for (e.g. doesn't report up-down-up) before emitting ENTITY_RECOVERED") - .defaultValue(Duration.ZERO) - .build(); - - @SetFromFlag("entityFailedRepublishTime") - public static final ConfigKey<Duration> ENTITY_FAILED_REPUBLISH_TIME = BasicConfigKey.builder(Duration.class) - .name("entityFailed.republishTime") - .description("Publish failed state periodically at the specified intervals, null to disable.") - .build(); - - protected Long firstUpTime; - - protected Long currentFailureStartTime = null; - protected Long currentRecoveryStartTime = null; - - protected Long publishEntityFailedTime = null; - protected Long publishEntityRecoveredTime = null; - protected Long setEntityOnFireTime = null; - - protected LastPublished lastPublished = LastPublished.NONE; - - private final AtomicBoolean executorQueued = new AtomicBoolean(false); - private volatile long executorTime = 0; - - /** - * TODO Really don't want this mutex! - * ServiceStateLogic.setExpectedState() will call into `onEvent(null)`, so could get concurrent calls. - * How to handle that? I don't think `ServiceStateLogic.setExpectedState` should be making the call, but - * presumably that is their to remove a race condition so it is set before method returns. Caller shouldn't - * rely on that though. - * e.g. see `ServiceFailureDetectorTest.testNotifiedOfFailureOnStateOnFire`, where we get two notifications. - */ - private final Object mutex = new Object(); - - public ServiceFailureDetector() { - this(new ConfigBag()); - } - - public ServiceFailureDetector(Map<String,?> flags) { - this(new ConfigBag().putAll(flags)); - } - - public ServiceFailureDetector(ConfigBag configBag) { - // TODO hierarchy should use ConfigBag, and not change flags - super(configBag.getAllConfigMutable()); - } - - @Override - public void onEvent(SensorEvent<Object> event) { - if (firstUpTime==null) { - if (event!=null && Attributes.SERVICE_UP.equals(event.getSensor()) && Boolean.TRUE.equals(event.getValue())) { - firstUpTime = event.getTimestamp(); - } else if (event == null && Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) { - // If this enricher is registered after the entity is up, then we'll get a "synthetic" onEvent(null) - firstUpTime = System.currentTimeMillis(); - } - } - - super.onEvent(event); - } - - @Override - protected void setActualState(Lifecycle state) { - long now = System.currentTimeMillis(); - - synchronized (mutex) { - if (state==Lifecycle.ON_FIRE) { - if (lastPublished == LastPublished.FAILED) { - if (currentRecoveryStartTime != null) { - if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component was recovering, now failing: {}", new Object[] {this, entity, getExplanation(state)}); - currentRecoveryStartTime = null; - publishEntityRecoveredTime = null; - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component still failed: {}", new Object[] {this, entity, getExplanation(state)}); - } - } else { - if (firstUpTime == null && getConfig(ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP)) { - // suppress; won't publish - } else if (currentFailureStartTime == null) { - if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component now failing: {}", new Object[] {this, entity, getExplanation(state)}); - currentFailureStartTime = now; - publishEntityFailedTime = currentFailureStartTime + getConfig(ENTITY_FAILED_STABILIZATION_DELAY).toMilliseconds(); - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component continuing failing: {}", new Object[] {this, entity, getExplanation(state)}); - } - } - if (setEntityOnFireTime == null) { - setEntityOnFireTime = now + getConfig(SERVICE_ON_FIRE_STABILIZATION_DELAY).toMilliseconds(); - } - currentRecoveryStartTime = null; - publishEntityRecoveredTime = null; - - } else if (state == Lifecycle.RUNNING) { - if (lastPublished == LastPublished.FAILED) { - if (currentRecoveryStartTime == null) { - if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component now recovering: {}", new Object[] {this, entity, getExplanation(state)}); - currentRecoveryStartTime = now; - publishEntityRecoveredTime = currentRecoveryStartTime + getConfig(ENTITY_RECOVERED_STABILIZATION_DELAY).toMilliseconds(); - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component continuing recovering: {}", new Object[] {this, entity, getExplanation(state)}); - } - } else { - if (currentFailureStartTime != null) { - if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component was failing, now healthy: {}", new Object[] {this, entity, getExplanation(state)}); - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component still healthy: {}", new Object[] {this, entity, getExplanation(state)}); - } - } - currentFailureStartTime = null; - publishEntityFailedTime = null; - setEntityOnFireTime = null; - - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, in unconfirmed sate: {}", new Object[] {this, entity, getExplanation(state)}); - } - - long recomputeIn = Long.MAX_VALUE; // For whether to call recomputeAfterDelay - - if (publishEntityFailedTime != null) { - long delayBeforeCheck = publishEntityFailedTime - now; - if (delayBeforeCheck<=0) { - if (LOG.isDebugEnabled()) LOG.debug("{} publishing failed (state={}; currentFailureStartTime={}; now={}", - new Object[] {this, state, Time.makeDateString(currentFailureStartTime), Time.makeDateString(now)}); - Duration republishDelay = getConfig(ENTITY_FAILED_REPUBLISH_TIME); - if (republishDelay == null) { - publishEntityFailedTime = null; - } else { - publishEntityFailedTime = now + republishDelay.toMilliseconds(); - recomputeIn = Math.min(recomputeIn, republishDelay.toMilliseconds()); - } - lastPublished = LastPublished.FAILED; - entity.emit(HASensors.ENTITY_FAILED, new HASensors.FailureDescriptor(entity, getFailureDescription(now))); - } else { - recomputeIn = Math.min(recomputeIn, delayBeforeCheck); - } - } else if (publishEntityRecoveredTime != null) { - long delayBeforeCheck = publishEntityRecoveredTime - now; - if (delayBeforeCheck<=0) { - if (LOG.isDebugEnabled()) LOG.debug("{} publishing recovered (state={}; currentRecoveryStartTime={}; now={}", - new Object[] {this, state, Time.makeDateString(currentRecoveryStartTime), Time.makeDateString(now)}); - publishEntityRecoveredTime = null; - lastPublished = LastPublished.RECOVERED; - entity.emit(HASensors.ENTITY_RECOVERED, new HASensors.FailureDescriptor(entity, null)); - } else { - recomputeIn = Math.min(recomputeIn, delayBeforeCheck); - } - } - - if (setEntityOnFireTime != null) { - long delayBeforeCheck = setEntityOnFireTime - now; - if (delayBeforeCheck<=0) { - if (LOG.isDebugEnabled()) LOG.debug("{} setting on-fire, now that deferred period has passed (state={})", - new Object[] {this, state}); - setEntityOnFireTime = null; - super.setActualState(state); - } else { - recomputeIn = Math.min(recomputeIn, delayBeforeCheck); - } - } else { - super.setActualState(state); - } - - if (recomputeIn < Long.MAX_VALUE) { - recomputeAfterDelay(recomputeIn); - } - } - } - - protected String getExplanation(Lifecycle state) { - Duration serviceFailedStabilizationDelay = getConfig(ENTITY_FAILED_STABILIZATION_DELAY); - Duration serviceRecoveredStabilizationDelay = getConfig(ENTITY_RECOVERED_STABILIZATION_DELAY); - - return String.format("location=%s; status=%s; lastPublished=%s; timeNow=%s; "+ - "currentFailurePeriod=%s; currentRecoveryPeriod=%s", - entity.getLocations(), - (state != null ? state : "<unreported>"), - lastPublished, - Time.makeDateString(System.currentTimeMillis()), - (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+Time.makeTimeStringRounded(serviceFailedStabilizationDelay) + ")", - (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+Time.makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")"); - } - - private String getFailureDescription(long now) { - String description = null; - Map<String, Object> serviceProblems = entity.getAttribute(Attributes.SERVICE_PROBLEMS); - if (serviceProblems!=null && !serviceProblems.isEmpty()) { - Entry<String, Object> problem = serviceProblems.entrySet().iterator().next(); - description = problem.getKey()+": "+problem.getValue(); - if (serviceProblems.size()>1) { - description = serviceProblems.size()+" service problems, including "+description; - } else { - description = "service problem: "+description; - } - } else if (Boolean.FALSE.equals(entity.getAttribute(Attributes.SERVICE_UP))) { - description = "service not up"; - } else { - description = "service failure detected"; - } - if (publishEntityFailedTime!=null && currentFailureStartTime!=null && publishEntityFailedTime > currentFailureStartTime) - description = " (stabilized for "+Duration.of(now - currentFailureStartTime, TimeUnit.MILLISECONDS)+")"; - return description; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - protected void recomputeAfterDelay(long delay) { - if (isRunning() && executorQueued.compareAndSet(false, true)) { - long now = System.currentTimeMillis(); - delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now)); - if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay); - - Runnable job = new Runnable() { - @Override public void run() { - try { - executorTime = System.currentTimeMillis(); - executorQueued.set(false); - - onEvent(null); - - } catch (Exception e) { - if (isRunning()) { - LOG.error("Error in enricher "+this+": "+e, e); - } else { - if (LOG.isDebugEnabled()) LOG.debug("Error in enricher "+this+" (but no longer running): "+e, e); - } - } catch (Throwable t) { - LOG.error("Error in enricher "+this+": "+t, t); - throw Exceptions.propagate(t); - } - } - }; - - ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job)); - ((EntityInternal)entity).getExecutionContext().submit(task); - } - } - - private String getTimeStringSince(Long time) { - return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java b/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java deleted file mode 100644 index 6bea1d4..0000000 --- a/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.ha; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.entity.basic.EntityLocal; -import org.apache.brooklyn.api.event.Sensor; -import org.apache.brooklyn.api.event.SensorEvent; -import org.apache.brooklyn.api.event.SensorEventListener; -import org.apache.brooklyn.core.policy.basic.AbstractPolicy; -import org.apache.brooklyn.core.util.config.ConfigBag; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic; -import brooklyn.entity.group.StopFailedRuntimeException; -import brooklyn.entity.trait.MemberReplaceable; -import brooklyn.event.basic.BasicConfigKey; -import brooklyn.event.basic.BasicNotificationSensor; -import brooklyn.policy.ha.HASensors.FailureDescriptor; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; - -import com.google.common.base.Ticker; -import com.google.common.collect.Lists; - -/** attaches to a DynamicCluster and replaces a failed member in response to HASensors.ENTITY_FAILED or other sensor; - * if this fails, it sets the Cluster state to on-fire */ -@Catalog(name="Service Replacer", description="HA policy for replacing a failed member of a group") -public class ServiceReplacer extends AbstractPolicy { - - private static final Logger LOG = LoggerFactory.getLogger(ServiceReplacer.class); - - // TODO if there are multiple failures perhaps we should abort quickly - - public static final BasicNotificationSensor<FailureDescriptor> ENTITY_REPLACEMENT_FAILED = new BasicNotificationSensor<FailureDescriptor>( - FailureDescriptor.class, "ha.entityFailed.replacement", "Indicates that an entity replacement attempt has failed"); - - @SetFromFlag("setOnFireOnFailure") - public static final ConfigKey<Boolean> SET_ON_FIRE_ON_FAILURE = ConfigKeys.newBooleanConfigKey("setOnFireOnFailure", "", true); - - /** monitors this sensor, by default ENTITY_RESTART_FAILED */ - @SetFromFlag("failureSensorToMonitor") - @SuppressWarnings("rawtypes") - public static final ConfigKey<Sensor> FAILURE_SENSOR_TO_MONITOR = new BasicConfigKey<Sensor>(Sensor.class, "failureSensorToMonitor", "", ServiceRestarter.ENTITY_RESTART_FAILED); - - /** skips replace if replacement has failed this many times failure re-occurs within this time interval */ - @SetFromFlag("failOnRecurringFailuresInThisDuration") - public static final ConfigKey<Long> FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION = ConfigKeys.newLongConfigKey( - "failOnRecurringFailuresInThisDuration", - "abandon replace if replacement has failed many times within this time interval", - 5*60*1000L); - - /** skips replace if replacement has failed this many times failure re-occurs within this time interval */ - @SetFromFlag("failOnNumRecurringFailures") - public static final ConfigKey<Integer> FAIL_ON_NUM_RECURRING_FAILURES = ConfigKeys.newIntegerConfigKey( - "failOnNumRecurringFailures", - "abandon replace if replacement has failed this many times (100% of attempts) within the time interval", - 5); - - @SetFromFlag("ticker") - public static final ConfigKey<Ticker> TICKER = ConfigKeys.newConfigKey(Ticker.class, - "ticker", - "A time source (defaults to system-clock, which is almost certainly what's wanted, except in tests)", - null); - - protected final List<Long> consecutiveReplacementFailureTimes = Lists.newCopyOnWriteArrayList(); - - public ServiceReplacer() { - this(new ConfigBag()); - } - - public ServiceReplacer(Map<String,?> flags) { - this(new ConfigBag().putAll(flags)); - } - - public ServiceReplacer(ConfigBag configBag) { - // TODO hierarchy should use ConfigBag, and not change flags - super(configBag.getAllConfigMutable()); - } - - public ServiceReplacer(Sensor<?> failureSensorToMonitor) { - this(new ConfigBag().configure(FAILURE_SENSOR_TO_MONITOR, failureSensorToMonitor)); - } - - @Override - public void setEntity(final EntityLocal entity) { - checkArgument(entity instanceof MemberReplaceable, "ServiceReplacer must take a MemberReplaceable, not %s", entity); - Sensor<?> failureSensorToMonitor = checkNotNull(getConfig(FAILURE_SENSOR_TO_MONITOR), "failureSensorToMonitor"); - - super.setEntity(entity); - - subscribeToMembers((Group)entity, failureSensorToMonitor, new SensorEventListener<Object>() { - @Override public void onEvent(final SensorEvent<Object> event) { - // Must execute in another thread - if we called entity.replaceMember in the event-listener's thread - // then we'd block all other events being delivered to this entity's other subscribers. - // Relies on synchronization of `onDetectedFailure`. - // See same pattern used in ServiceRestarter. - - // TODO Could use BasicExecutionManager.setTaskSchedulerForTag to prevent race of two - // events being received in rapid succession, and onDetectedFailure being executed out-of-order - // for them; or could write events to a blocking queue and have onDetectedFailure read from that. - - if (isRunning()) { - LOG.warn("ServiceReplacer notified; dispatching job for "+entity+" ("+event.getValue()+")"); - ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() { - @Override public void run() { - onDetectedFailure(event); - }}); - } else { - LOG.warn("ServiceReplacer not running, so not acting on failure detected at "+entity+" ("+event.getValue()+", child of "+entity+")"); - } - } - }); - } - - // TODO semaphores would be better to allow at-most-one-blocking behaviour - protected synchronized void onDetectedFailure(SensorEvent<Object> event) { - final Entity failedEntity = event.getSource(); - final Object reason = event.getValue(); - - if (isSuspended()) { - LOG.warn("ServiceReplacer suspended, so not acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")"); - return; - } - - if (isRepeatedlyFailingTooMuch()) { - LOG.error("ServiceReplacer not acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+"), because too many recent replacement failures"); - return; - } - - LOG.warn("ServiceReplacer acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")"); - ((EntityInternal)entity).getManagementSupport().getExecutionContext().submit(MutableMap.of(), new Runnable() { - - @Override - public void run() { - try { - Entities.invokeEffectorWithArgs(entity, entity, MemberReplaceable.REPLACE_MEMBER, failedEntity.getId()).get(); - consecutiveReplacementFailureTimes.clear(); - } catch (Exception e) { - if (Exceptions.getFirstThrowableOfType(e, StopFailedRuntimeException.class) != null) { - LOG.info("ServiceReplacer: ignoring error reported from stopping failed node "+failedEntity); - return; - } - onReplacementFailed("Replace failure ("+Exceptions.collapseText(e)+") at "+entity+": "+reason); - } - } - }); - } - - private boolean isRepeatedlyFailingTooMuch() { - Integer failOnNumRecurringFailures = getConfig(FAIL_ON_NUM_RECURRING_FAILURES); - long failOnRecurringFailuresInThisDuration = getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION); - long oldestPermitted = currentTimeMillis() - failOnRecurringFailuresInThisDuration; - - // trim old ones - for (Iterator<Long> iter = consecutiveReplacementFailureTimes.iterator(); iter.hasNext();) { - Long timestamp = iter.next(); - if (timestamp < oldestPermitted) { - iter.remove(); - } else { - break; - } - } - - return (consecutiveReplacementFailureTimes.size() >= failOnNumRecurringFailures); - } - - protected long currentTimeMillis() { - Ticker ticker = getConfig(TICKER); - return (ticker == null) ? System.currentTimeMillis() : TimeUnit.NANOSECONDS.toMillis(ticker.read()); - } - - protected void onReplacementFailed(String msg) { - LOG.warn("ServiceReplacer failed for "+entity+": "+msg); - consecutiveReplacementFailureTimes.add(currentTimeMillis()); - - if (getConfig(SET_ON_FIRE_ON_FAILURE)) { - ServiceProblemsLogic.updateProblemsIndicator(entity, "ServiceReplacer", "replacement failed: "+msg); - } - entity.emit(ENTITY_REPLACEMENT_FAILED, new FailureDescriptor(entity, msg)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java deleted file mode 100644 index ab1359d..0000000 --- a/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.ha; - -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.entity.basic.EntityLocal; -import org.apache.brooklyn.api.event.Sensor; -import org.apache.brooklyn.api.event.SensorEvent; -import org.apache.brooklyn.api.event.SensorEventListener; -import org.apache.brooklyn.core.policy.basic.AbstractPolicy; -import org.apache.brooklyn.core.util.config.ConfigBag; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.basic.Lifecycle; -import brooklyn.entity.basic.ServiceStateLogic; -import brooklyn.entity.trait.Startable; -import brooklyn.event.basic.BasicNotificationSensor; -import brooklyn.policy.ha.HASensors.FailureDescriptor; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.javalang.JavaClassNames; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.base.Preconditions; - -/** attaches to a SoftwareProcess (or anything Startable, emitting ENTITY_FAILED or other configurable sensor), - * and invokes restart on failure; - * if there is a subsequent failure within a configurable time interval, or if the restart fails, - * this gives up and emits {@link #ENTITY_RESTART_FAILED} - */ -@Catalog(name="Service Restarter", description="HA policy for restarting a service automatically, " - + "and for emitting an events if the service repeatedly fails") -public class ServiceRestarter extends AbstractPolicy { - - private static final Logger LOG = LoggerFactory.getLogger(ServiceRestarter.class); - - public static final BasicNotificationSensor<FailureDescriptor> ENTITY_RESTART_FAILED = new BasicNotificationSensor<FailureDescriptor>( - FailureDescriptor.class, "ha.entityFailed.restart", "Indicates that an entity restart attempt has failed"); - - /** skips retry if a failure re-occurs within this time interval */ - @SetFromFlag("failOnRecurringFailuresInThisDuration") - public static final ConfigKey<Duration> FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION = ConfigKeys.newConfigKey( - Duration.class, - "failOnRecurringFailuresInThisDuration", - "Reports entity as failed if it fails two or more times in this time window", - Duration.minutes(3)); - - @SetFromFlag("setOnFireOnFailure") - public static final ConfigKey<Boolean> SET_ON_FIRE_ON_FAILURE = ConfigKeys.newBooleanConfigKey("setOnFireOnFailure", "", true); - - /** monitors this sensor, by default ENTITY_FAILED */ - @SetFromFlag("failureSensorToMonitor") - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static final ConfigKey<Sensor<?>> FAILURE_SENSOR_TO_MONITOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class, "failureSensorToMonitor", "", HASensors.ENTITY_FAILED); - - protected final AtomicReference<Long> lastFailureTime = new AtomicReference<Long>(); - - public ServiceRestarter() { - this(new ConfigBag()); - } - - public ServiceRestarter(Map<String,?> flags) { - this(new ConfigBag().putAll(flags)); - } - - public ServiceRestarter(ConfigBag configBag) { - // TODO hierarchy should use ConfigBag, and not change flags - super(configBag.getAllConfigMutable()); - uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+getConfig(FAILURE_SENSOR_TO_MONITOR).getName(); - } - - public ServiceRestarter(Sensor<?> failureSensorToMonitor) { - this(new ConfigBag().configure(FAILURE_SENSOR_TO_MONITOR, failureSensorToMonitor)); - } - - @Override - public void setEntity(final EntityLocal entity) { - Preconditions.checkArgument(entity instanceof Startable, "Restarter must take a Startable, not "+entity); - - super.setEntity(entity); - - subscribe(entity, getConfig(FAILURE_SENSOR_TO_MONITOR), new SensorEventListener<Object>() { - @Override public void onEvent(final SensorEvent<Object> event) { - // Must execute in another thread - if we called entity.restart in the event-listener's thread - // then we'd block all other events being delivered to this entity's other subscribers. - // Relies on synchronization of `onDetectedFailure`. - // See same pattern used in ServiceReplacer. - - // TODO Could use BasicExecutionManager.setTaskSchedulerForTag to prevent race of two - // events being received in rapid succession, and onDetectedFailure being executed out-of-order - // for them; or could write events to a blocking queue and have onDetectedFailure read from that. - - if (isRunning()) { - LOG.info("ServiceRestarter notified; dispatching job for "+entity+" ("+event.getValue()+")"); - ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() { - @Override public void run() { - onDetectedFailure(event); - }}); - } else { - LOG.warn("ServiceRestarter not running, so not acting on failure detected at "+entity+" ("+event.getValue()+")"); - } - } - }); - } - - // TODO semaphores would be better to allow at-most-one-blocking behaviour - // FIXME as this is called in message-dispatch (single threaded) we should do most of this in a new submitted task - // (as has been done in ServiceReplacer) - protected synchronized void onDetectedFailure(SensorEvent<Object> event) { - if (isSuspended()) { - LOG.warn("ServiceRestarter suspended, so not acting on failure detected at "+entity+" ("+event.getValue()+")"); - return; - } - - LOG.warn("ServiceRestarter acting on failure detected at "+entity+" ("+event.getValue()+")"); - long current = System.currentTimeMillis(); - Long last = lastFailureTime.getAndSet(current); - long elapsed = last==null ? -1 : current-last; - if (elapsed>=0 && elapsed <= getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION).toMilliseconds()) { - onRestartFailed("Restart failure (failed again after "+Time.makeTimeStringRounded(elapsed)+") at "+entity+": "+event.getValue()); - return; - } - try { - ServiceStateLogic.setExpectedState(entity, Lifecycle.STARTING); - Entities.invokeEffector(entity, entity, Startable.RESTART).get(); - } catch (Exception e) { - onRestartFailed("Restart failure (error "+e+") at "+entity+": "+event.getValue()); - } - } - - protected void onRestartFailed(String msg) { - LOG.warn("ServiceRestarter failed for "+entity+": "+msg); - if (getConfig(SET_ON_FIRE_ON_FAILURE)) { - ServiceStateLogic.setExpectedState(entity, Lifecycle.ON_FIRE); - } - entity.emit(ENTITY_RESTART_FAILED, new FailureDescriptor(entity, msg)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java deleted file mode 100644 index 1fa9982..0000000 --- a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.ha; - -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.core.util.internal.ssh.SshTool; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.event.basic.BasicNotificationSensor; - -import org.apache.brooklyn.location.basic.Machines; -import org.apache.brooklyn.location.basic.SshMachineLocation; - -import brooklyn.policy.ha.HASensors.FailureDescriptor; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.guava.Maybe; -import brooklyn.util.time.Duration; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -@Catalog(name="Ssh Connectivity Failure Detector", description="HA policy for monitoring an SshMachine, " - + "emitting an event if the connection is lost/restored") -public class SshMachineFailureDetector extends AbstractFailureDetector { - private static final Logger LOG = LoggerFactory.getLogger(SshMachineFailureDetector.class); - public static final String DEFAULT_UNIQUE_TAG = "failureDetector.sshMachine.tag"; - - public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED; - - public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED; - - public static final ConfigKey<Duration> CONNECT_TIMEOUT = ConfigKeys.newDurationConfigKey( - "ha.sshConnection.timeout", "How long to wait for conneciton before declaring failure", Duration.TEN_SECONDS); - - @Override - public void init() { - super.init(); - if (config().getRaw(SENSOR_FAILED).isAbsent()) { - config().set(SENSOR_FAILED, CONNECTION_FAILED); - } - if (config().getRaw(SENSOR_RECOVERED).isAbsent()) { - config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED); - } - if (config().getRaw(POLL_PERIOD).isAbsent()) { - config().set(POLL_PERIOD, Duration.ONE_MINUTE); - } - uniqueTag = DEFAULT_UNIQUE_TAG; - } - - @Override - protected CalculatedStatus calculateStatus() { - Maybe<SshMachineLocation> sshMachineOption = Machines.findUniqueSshMachineLocation(entity.getLocations()); - if (sshMachineOption.isPresent()) { - SshMachineLocation sshMachine = sshMachineOption.get(); - try { - Duration timeout = config().get(CONNECT_TIMEOUT); - Map<String, ?> flags = ImmutableMap.of( - SshTool.PROP_CONNECT_TIMEOUT.getName(), timeout.toMilliseconds(), - SshTool.PROP_SESSION_TIMEOUT.getName(), timeout.toMilliseconds(), - SshTool.PROP_SSH_TRIES.getName(), 1); - int exitCode = sshMachine.execCommands(flags, SshMachineFailureDetector.class.getName(), ImmutableList.of("exit")); - return new BasicCalculatedStatus(exitCode == 0, sshMachine.toString()); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - boolean isFirstFailure = lastPublished != LastPublished.FAILED && currentFailureStartTime == null; - if (isFirstFailure) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed connecting to machine " + sshMachine, e); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Failed connecting to machine " + sshMachine, e); - } - } - return new BasicCalculatedStatus(false, e.getMessage()); - } - } else { - return new BasicCalculatedStatus(true, "no machine started, not complaining"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java deleted file mode 100644 index 0039685..0000000 --- a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.loadbalancing; - -import java.util.Set; - -import org.apache.brooklyn.api.entity.Entity; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.AbstractGroup; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.event.basic.BasicNotificationSensor; -import brooklyn.util.collections.QuorumCheck; -import brooklyn.util.collections.QuorumCheck.QuorumChecks; - -/** - * Contains worker items that can be moved between this container and others to effect load balancing. - * Membership of a balanceable container does not imply a parent-child relationship in the Brooklyn - * management sense. - */ -public interface BalanceableContainer<ItemType extends Movable> extends Entity, AbstractGroup { - - public static BasicNotificationSensor<Entity> ITEM_ADDED = new BasicNotificationSensor<Entity>( - Entity.class, "balanceablecontainer.item.added", "Movable item added to balanceable container"); - public static BasicNotificationSensor<Entity> ITEM_REMOVED = new BasicNotificationSensor<Entity>( - Entity.class, "balanceablecontainer.item.removed", "Movable item removed from balanceable container"); - - public static final ConfigKey<QuorumCheck> UP_QUORUM_CHECK = ConfigKeys.newConfigKeyWithDefault(AbstractGroup.UP_QUORUM_CHECK, - "Up check from members; default one for container overrides usual check to always return true, " - + "i.e. not block service up simply because the container is empty or something in the container has failed", - QuorumChecks.alwaysTrue()); - - public Set<ItemType> getBalanceableItems(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java deleted file mode 100644 index 33f9e0b..0000000 --- a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.loadbalancing; - -import java.util.Map; -import java.util.Set; - -import org.apache.brooklyn.api.location.Location; - -/** - * Captures the state of a balanceable cluster of containers and all their constituent items, including workrates, - * for consumption by a {@link BalancingStrategy}. - */ -public interface BalanceablePoolModel<ContainerType, ItemType> { - - // Attributes of the pool. - public String getName(); - public int getPoolSize(); - public Set<ContainerType> getPoolContents(); - public double getPoolLowThreshold(); - public double getPoolHighThreshold(); - public double getCurrentPoolWorkrate(); - public boolean isHot(); - public boolean isCold(); - - - // Attributes of containers and items. - public String getName(ContainerType container); - public Location getLocation(ContainerType container); - public double getLowThreshold(ContainerType container); // -1 for not known / invalid - public double getHighThreshold(ContainerType container); // -1 for not known / invalid - public double getTotalWorkrate(ContainerType container); // -1 for not known / invalid - public Map<ContainerType, Double> getContainerWorkrates(); // contains -1 for items which are unknown - /** contains -1 instead of actual item workrate, for items which cannot be moved */ - // @Nullable("null if the node is prevented from reporting and/or being adjusted, or has no data yet") - public Map<ItemType, Double> getItemWorkrates(ContainerType container); - public boolean isItemMoveable(ItemType item); - public boolean isItemAllowedIn(ItemType item, Location location); - - // Mutators for keeping the model in-sync with the observed world - public void onContainerAdded(ContainerType newContainer, double lowThreshold, double highThreshold); - public void onContainerRemoved(ContainerType oldContainer); - public void onItemAdded(ItemType item, ContainerType parentContainer); - public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable); - public void onItemRemoved(ItemType item); - public void onItemWorkrateUpdated(ItemType item, double newValue); - public void onItemMoved(ItemType item, ContainerType targetContainer); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java deleted file mode 100644 index c8d3a8b..0000000 --- a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.loadbalancing; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Serializable; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; - -import brooklyn.entity.trait.Resizable; -import brooklyn.event.basic.BasicNotificationSensor; - -/** - * Represents an elastic group of "container" entities, each of which is capable of hosting "item" entities that perform - * work and consume the container's available resources (e.g. CPU or bandwidth). Auto-scaling and load-balancing policies can - * be attached to this pool to provide dynamic elasticity based on workrates reported by the individual item entities. - * <p> - * The containers must be "up" in order to receive work, thus they must NOT follow the default enricher pattern - * for groups which says that the group must be up to receive work. - */ -@ImplementedBy(BalanceableWorkerPoolImpl.class) -public interface BalanceableWorkerPool extends Entity, Resizable { - - // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing - // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`. - - /** Encapsulates an item and a container; emitted for {@code ITEM_ADDED}, {@code ITEM_REMOVED} and - * {@code ITEM_MOVED} sensors. - */ - public static class ContainerItemPair implements Serializable { - private static final long serialVersionUID = 1L; - public final BalanceableContainer<?> container; - public final Entity item; - - public ContainerItemPair(BalanceableContainer<?> container, Entity item) { - this.container = container; - this.item = checkNotNull(item); - } - - @Override - public String toString() { - return ""+item+" @ "+container; - } - } - - // Pool constituent notifications. - public static BasicNotificationSensor<Entity> CONTAINER_ADDED = new BasicNotificationSensor<Entity>( - Entity.class, "balanceablepool.container.added", "Container added to balanceable pool"); - public static BasicNotificationSensor<Entity> CONTAINER_REMOVED = new BasicNotificationSensor<Entity>( - Entity.class, "balanceablepool.container.removed", "Container removed from balanceable pool"); - public static BasicNotificationSensor<ContainerItemPair> ITEM_ADDED = new BasicNotificationSensor<ContainerItemPair>( - ContainerItemPair.class, "balanceablepool.item.added", "Item added to balanceable pool"); - public static BasicNotificationSensor<ContainerItemPair> ITEM_REMOVED = new BasicNotificationSensor<ContainerItemPair>( - ContainerItemPair.class, "balanceablepool.item.removed", "Item removed from balanceable pool"); - public static BasicNotificationSensor<ContainerItemPair> ITEM_MOVED = new BasicNotificationSensor<ContainerItemPair>( - ContainerItemPair.class, "balanceablepool.item.moved", "Item moved in balanceable pool to the given container"); - - public void setResizable(Resizable resizable); - - public void setContents(Group containerGroup, Group itemGroup); - - public Group getContainerGroup(); - - public Group getItemGroup(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java deleted file mode 100644 index b3f9633..0000000 --- a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.loadbalancing; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.event.Sensor; -import org.apache.brooklyn.api.event.SensorEvent; -import org.apache.brooklyn.api.event.SensorEventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.AbstractEntity; -import brooklyn.entity.basic.AbstractGroup; -import brooklyn.entity.trait.Resizable; -import brooklyn.entity.trait.Startable; - -/** - * @see BalanceableWorkerPool - */ -public class BalanceableWorkerPoolImpl extends AbstractEntity implements BalanceableWorkerPool { - - // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing - // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`. - - private static final Logger LOG = LoggerFactory.getLogger(BalanceableWorkerPool.class); - - private Group containerGroup; - private Group itemGroup; - private Resizable resizable; - - private final Set<Entity> containers = Collections.synchronizedSet(new HashSet<Entity>()); - private final Set<Entity> items = Collections.synchronizedSet(new HashSet<Entity>()); - - private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() { - @Override - public void onEvent(SensorEvent<Object> event) { - if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", BalanceableWorkerPoolImpl.this, event); - Entity source = event.getSource(); - Object value = event.getValue(); - Sensor<?> sensor = event.getSensor(); - - if (sensor.equals(AbstractGroup.MEMBER_ADDED)) { - if (source.equals(containerGroup)) { - onContainerAdded((BalanceableContainer<?>) value); - } else if (source.equals(itemGroup)) { - onItemAdded((Entity)value); - } else { - throw new IllegalStateException("unexpected event source="+source); - } - } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) { - if (source.equals(containerGroup)) { - onContainerRemoved((BalanceableContainer<?>) value); - } else if (source.equals(itemGroup)) { - onItemRemoved((Entity) value); - } else { - throw new IllegalStateException("unexpected event source="+source); - } - } else if (sensor.equals(Startable.SERVICE_UP)) { - // TODO What if start has failed? Is there a sensor to indicate that? - if ((Boolean)value) { - onContainerUp((BalanceableContainer<?>) source); - } else { - onContainerDown((BalanceableContainer<?>) source); - } - } else if (sensor.equals(Movable.CONTAINER)) { - onItemMoved(source, (BalanceableContainer<?>) value); - } else { - throw new IllegalStateException("Unhandled event type "+sensor+": "+event); - } - } - }; - - public BalanceableWorkerPoolImpl() { - } - - @Override - public void setResizable(Resizable resizable) { - this.resizable = resizable; - } - - @Override - public void setContents(Group containerGroup, Group itemGroup) { - this.containerGroup = containerGroup; - this.itemGroup = itemGroup; - if (resizable == null && containerGroup instanceof Resizable) resizable = (Resizable) containerGroup; - - subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler); - subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler); - subscribe(itemGroup, AbstractGroup.MEMBER_ADDED, eventHandler); - subscribe(itemGroup, AbstractGroup.MEMBER_REMOVED, eventHandler); - - // Process extant containers and items - for (Entity existingContainer : containerGroup.getMembers()) { - onContainerAdded((BalanceableContainer<?>)existingContainer); - } - for (Entity existingItem : itemGroup.getMembers()) { - onItemAdded(existingItem); - } - } - - @Override - public Group getContainerGroup() { - return containerGroup; - } - - @Override - public Group getItemGroup() { - return itemGroup; - } - - @Override - public Integer getCurrentSize() { - return containerGroup.getCurrentSize(); - } - - @Override - public Integer resize(Integer desiredSize) { - if (resizable != null) return resizable.resize(desiredSize); - - throw new UnsupportedOperationException("Container group is not resizable, and no resizable supplied: "+containerGroup+" of type "+(containerGroup != null ? containerGroup.getClass().getCanonicalName() : null)); - } - - private void onContainerAdded(BalanceableContainer<?> newContainer) { - subscribe(newContainer, Startable.SERVICE_UP, eventHandler); - if (!(newContainer instanceof Startable) || Boolean.TRUE.equals(newContainer.getAttribute(Startable.SERVICE_UP))) { - onContainerUp(newContainer); - } - } - - private void onContainerUp(BalanceableContainer<?> newContainer) { - if (containers.add(newContainer)) { - emit(CONTAINER_ADDED, newContainer); - } - } - - private void onContainerDown(BalanceableContainer<?> oldContainer) { - if (containers.remove(oldContainer)) { - emit(CONTAINER_REMOVED, oldContainer); - } - } - - private void onContainerRemoved(BalanceableContainer<?> oldContainer) { - unsubscribe(oldContainer); - onContainerDown(oldContainer); - } - - private void onItemAdded(Entity item) { - if (items.add(item)) { - subscribe(item, Movable.CONTAINER, eventHandler); - emit(ITEM_ADDED, new ContainerItemPair(item.getAttribute(Movable.CONTAINER), item)); - } - } - - private void onItemRemoved(Entity item) { - if (items.remove(item)) { - unsubscribe(item); - emit(ITEM_REMOVED, new ContainerItemPair(null, item)); - } - } - - private void onItemMoved(Entity item, BalanceableContainer<?> container) { - emit(ITEM_MOVED, new ContainerItemPair(container, item)); - } -}
