http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java b/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java deleted file mode 100644 index 525ba20..0000000 --- a/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.autoscaling; - -import java.util.List; - -import brooklyn.util.collections.MutableMap; -import brooklyn.util.collections.TimeWindowedList; -import brooklyn.util.collections.TimestampedValue; -import brooklyn.util.time.Duration; - -import com.google.common.base.Objects; - -/** - * Using a {@link TimeWindowedList}, tracks the recent history of values to allow a summary of - * those values to be obtained. - * - * @author aled - */ -public class SizeHistory { - - public static class WindowSummary { - /** The most recent value (or -1 if there has been no value) */ - public final long latest; - - /** The minimum vaule within the given time period */ - public final long min; - - /** The maximum vaule within the given time period */ - public final long max; - - /** true if, since that max value, there have not been any higher values */ - public final boolean stableForGrowth; - - /** true if, since that low value, there have not been any lower values */ - public final boolean stableForShrinking; - - public WindowSummary(long latest, long min, long max, boolean stableForGrowth, boolean stableForShrinking) { - this.latest = latest; - this.min = min; - this.max = max; - this.stableForGrowth = stableForGrowth; - this.stableForShrinking = stableForShrinking; - } - - @Override - public String toString() { - return Objects.toStringHelper(this).add("latest", latest).add("min", min).add("max", max) - .add("stableForGrowth", stableForGrowth).add("stableForShrinking", stableForShrinking).toString(); - } - } - - private final TimeWindowedList<Number> recentDesiredResizes; - - public SizeHistory(long windowSize) { - recentDesiredResizes = new TimeWindowedList<Number>(MutableMap.of("timePeriod", windowSize, "minExpiredVals", 1)); - } - - public void add(final int val) { - recentDesiredResizes.add(val); - } - - public void setWindowSize(Duration newWindowSize) { - recentDesiredResizes.setTimePeriod(newWindowSize); - } - - /** - * Summarises the history of values in this time window, with a few special things: - * <ul> - * <li>If entire time-window is not covered by the given values, then min is Integer.MIN_VALUE and max is Integer.MAX_VALUE - * <li>If no values, then latest is -1 - * <li>If no recent values, then keeps last-seen value (no matter how old), to use that - * <li>"stable for growth" means that since that max value, there have not been any higher values - * <li>"stable for shrinking" means that since that low value, there have not been any lower values - * </ul> - */ - public WindowSummary summarizeWindow(Duration windowSize) { - long now = System.currentTimeMillis(); - List<TimestampedValue<Number>> windowVals = recentDesiredResizes.getValuesInWindow(now, windowSize); - - Number latestObj = latestInWindow(windowVals); - long latest = (latestObj == null) ? -1: latestObj.longValue(); - long max = maxInWindow(windowVals, windowSize).longValue(); - long min = minInWindow(windowVals, windowSize).longValue(); - - // TODO Could do more sophisticated "stable" check; this is the easiest code - correct but not most efficient - // in terms of the caller having to schedule additional stability checks. - boolean stable = (min == max); - - return new WindowSummary(latest, min, max, stable, stable); - } - - /** - * If the entire time-window is not covered by the given values, then returns Integer.MAX_VALUE. - */ - private <T extends Number> T maxInWindow(List<TimestampedValue<T>> vals, Duration timeWindow) { - // TODO bad casting from Integer default result to T - long now = System.currentTimeMillis(); - long epoch = now - timeWindow.toMilliseconds(); - T result = null; - double resultAsDouble = Integer.MAX_VALUE; - for (TimestampedValue<T> val : vals) { - T valAsNum = val.getValue(); - double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0; - if (result == null && val.getTimestamp() > epoch) { - result = withDefault(null, Integer.MAX_VALUE); - resultAsDouble = result.doubleValue(); - } - if (result == null || (valAsNum != null && valAsDouble > resultAsDouble)) { - result = valAsNum; - resultAsDouble = valAsDouble; - } - } - return withDefault(result, Integer.MAX_VALUE); - } - - /** - * If the entire time-window is not covered by the given values, then returns Integer.MIN_VALUE - */ - private <T extends Number> T minInWindow(List<TimestampedValue<T>> vals, Duration timeWindow) { - long now = System.currentTimeMillis(); - long epoch = now - timeWindow.toMilliseconds(); - T result = null; - double resultAsDouble = Integer.MIN_VALUE; - for (TimestampedValue<T> val : vals) { - T valAsNum = val.getValue(); - double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0; - if (result == null && val.getTimestamp() > epoch) { - result = withDefault(null, Integer.MIN_VALUE); - resultAsDouble = result.doubleValue(); - } - if (result == null || (val.getValue() != null && valAsDouble < resultAsDouble)) { - result = valAsNum; - resultAsDouble = valAsDouble; - } - } - return withDefault(result, Integer.MIN_VALUE); - } - - @SuppressWarnings("unchecked") - private <T> T withDefault(T result, Integer defaultValue) { - return result!=null ? result : (T) defaultValue; - } - /** - * @return null if empty, or the most recent value - */ - private <T extends Number> T latestInWindow(List<TimestampedValue<T>> vals) { - return vals.isEmpty() ? null : vals.get(vals.size()-1).getValue(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java b/policy/src/main/java/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java deleted file mode 100644 index 922b33d..0000000 --- a/policy/src/main/java/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.followthesun; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.location.basic.AbstractLocation; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; - -public class DefaultFollowTheSunModel<ContainerType, ItemType> implements FollowTheSunModel<ContainerType, ItemType> { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultFollowTheSunModel.class); - - // Concurrent maps cannot have null value; use this to represent when no container is supplied for an item - private static final String NULL = "null-val"; - private static final Location NULL_LOCATION = new AbstractLocation(newHashMap("name","null-location")) {}; - - private final String name; - private final Set<ContainerType> containers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerType,Boolean>()); - private final Map<ItemType, ContainerType> itemToContainer = new ConcurrentHashMap<ItemType, ContainerType>(); - private final Map<ContainerType, Location> containerToLocation = new ConcurrentHashMap<ContainerType, Location>(); - private final Map<ItemType, Location> itemToLocation = new ConcurrentHashMap<ItemType, Location>(); - private final Map<ItemType, Map<? extends ItemType, Double>> itemUsage = new ConcurrentHashMap<ItemType, Map<? extends ItemType,Double>>(); - private final Set<ItemType> immovableItems = Collections.newSetFromMap(new ConcurrentHashMap<ItemType, Boolean>()); - - public DefaultFollowTheSunModel(String name) { - this.name = name; - } - - @Override - public Set<ItemType> getItems() { - return itemToContainer.keySet(); - } - - @Override - public ContainerType getItemContainer(ItemType item) { - ContainerType result = itemToContainer.get(item); - return (isNull(result) ? null : result); - } - - @Override - public Location getItemLocation(ItemType item) { - Location result = itemToLocation.get(item); - return (isNull(result) ? null : result); - } - - @Override - public Location getContainerLocation(ContainerType container) { - Location result = containerToLocation.get(container); - return (isNull(result) ? null : result); - } - - // Provider methods. - - @Override public String getName() { - return name; - } - - // TODO: delete? - @Override public String getName(ItemType item) { - return item.toString(); - } - - @Override public boolean isItemMoveable(ItemType item) { - // If don't know about item, then assume not movable; otherwise has this item been explicitly flagged as immovable? - return hasItem(item) && !immovableItems.contains(item); - } - - @Override public boolean isItemAllowedIn(ItemType item, Location location) { - return true; // TODO? - } - - @Override public boolean hasActiveMigration(ItemType item) { - return false; // TODO? - } - - @Override - // FIXME Too expensive to compute; store in a different data structure? - public Map<ItemType, Map<Location, Double>> getDirectSendsToItemByLocation() { - Map<ItemType, Map<Location, Double>> result = new LinkedHashMap<ItemType, Map<Location,Double>>(getNumItems()); - - for (Map.Entry<ItemType, Map<? extends ItemType, Double>> entry : itemUsage.entrySet()) { - ItemType targetItem = entry.getKey(); - Map<? extends ItemType, Double> sources = entry.getValue(); - if (sources.isEmpty()) continue; // no-one talking to us - - Map<Location, Double> targetUsageByLocation = new LinkedHashMap<Location, Double>(); - result.put(targetItem, targetUsageByLocation); - - for (Map.Entry<? extends ItemType, Double> entry2 : sources.entrySet()) { - ItemType sourceItem = entry2.getKey(); - Location sourceLocation = getItemLocation(sourceItem); - double usageVal = (entry.getValue() != null) ? entry2.getValue() : 0d; - if (sourceLocation == null) continue; // don't know where to attribute this load; e.g. item may have just terminated - if (sourceItem.equals(targetItem)) continue; // ignore msgs to self - - Double usageValTotal = targetUsageByLocation.get(sourceLocation); - double newUsageValTotal = (usageValTotal != null ? usageValTotal : 0d) + usageVal; - targetUsageByLocation.put(sourceLocation, newUsageValTotal); - } - } - - return result; - } - - @Override - public Set<ContainerType> getAvailableContainersFor(ItemType item, Location location) { - checkNotNull(location); - return getContainersInLocation(location); - } - - - // Mutators. - - @Override - public void onItemMoved(ItemType item, ContainerType newContainer) { - // idempotent, as may be called multiple times - Location newLocation = (newContainer != null) ? containerToLocation.get(newContainer) : null; - ContainerType newContainerNonNull = toNonNullContainer(newContainer); - Location newLocationNonNull = toNonNullLocation(newLocation); - ContainerType oldContainer = itemToContainer.put(item, newContainerNonNull); - Location oldLocation = itemToLocation.put(item, newLocationNonNull); - } - - @Override - public void onContainerAdded(ContainerType container, Location location) { - Location locationNonNull = toNonNullLocation(location); - containers.add(container); - containerToLocation.put(container, locationNonNull); - for (ItemType item : getItemsOnContainer(container)) { - itemToLocation.put(item, locationNonNull); - } - } - - @Override - public void onContainerRemoved(ContainerType container) { - containers.remove(container); - containerToLocation.remove(container); - } - - public void onContainerLocationUpdated(ContainerType container, Location location) { - if (!containers.contains(container)) { - // unknown container; probably just stopped? - // If this overtook onContainerAdded, then assume we'll lookup the location and get it right in onContainerAdded - if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of location for unknown container {}, to {}", container, location); - return; - } - Location locationNonNull = toNonNullLocation(location); - containerToLocation.put(container, locationNonNull); - for (ItemType item : getItemsOnContainer(container)) { - itemToLocation.put(item, locationNonNull); - } - } - - @Override - public void onItemAdded(ItemType item, ContainerType container, boolean immovable) { - // idempotent, as may be called multiple times - - if (immovable) { - immovableItems.add(item); - } - Location location = (container != null) ? containerToLocation.get(container) : null; - ContainerType containerNonNull = toNonNullContainer(container); - Location locationNonNull = toNonNullLocation(location); - ContainerType oldContainer = itemToContainer.put(item, containerNonNull); - Location oldLocation = itemToLocation.put(item, locationNonNull); - } - - @Override - public void onItemRemoved(ItemType item) { - itemToContainer.remove(item); - itemToLocation.remove(item); - itemUsage.remove(item); - immovableItems.remove(item); - } - - @Override - public void onItemUsageUpdated(ItemType item, Map<? extends ItemType, Double> newValue) { - if (hasItem(item)) { - itemUsage.put(item, newValue); - } else { - // Can happen when item removed - get notification of removal and workrate from group and item - // respectively, so can overtake each other - if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of usage for unknown item {}, to {}", item, newValue); - } - } - - - // Additional methods for tests. - - /** - * Warning: this can be an expensive (time and memory) operation if there are a lot of items/containers. - */ - @VisibleForTesting - public String itemDistributionToString() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - dumpItemDistribution(new PrintStream(baos)); - return new String(baos.toByteArray()); - } - - @VisibleForTesting - public void dumpItemDistribution() { - dumpItemDistribution(System.out); - } - - @VisibleForTesting - public void dumpItemDistribution(PrintStream out) { - Map<ItemType, Map<Location, Double>> directSendsToItemByLocation = getDirectSendsToItemByLocation(); - - out.println("Follow-The-Sun dump: "); - for (Location location: getLocations()) { - out.println("\t"+"Location "+location); - for (ContainerType container : getContainersInLocation(location)) { - out.println("\t\t"+"Container "+container); - for (ItemType item : getItemsOnContainer(container)) { - Map<Location, Double> inboundUsage = directSendsToItemByLocation.get(item); - Map<? extends ItemType, Double> outboundUsage = itemUsage.get(item); - double totalInboundByLocation = (inboundUsage != null) ? sum(inboundUsage.values()) : 0d; - double totalInboundByActor = (outboundUsage != null) ? sum(outboundUsage.values()) : 0d; - out.println("\t\t\t"+"Item "+item); - out.println("\t\t\t\t"+"Inbound-by-location: "+totalInboundByLocation+": "+inboundUsage); - out.println("\t\t\t\t"+"Inbound-by-actor: "+totalInboundByActor+": "+outboundUsage); - } - } - } - out.flush(); - } - - private boolean hasItem(ItemType item) { - return itemToContainer.containsKey(item); - } - - private Set<Location> getLocations() { - return ImmutableSet.copyOf(containerToLocation.values()); - } - - private Set<ContainerType> getContainersInLocation(Location location) { - Set<ContainerType> result = new LinkedHashSet<ContainerType>(); - for (Map.Entry<ContainerType, Location> entry : containerToLocation.entrySet()) { - if (location.equals(entry.getValue())) { - result.add(entry.getKey()); - } - } - return result; - } - - private Set<ItemType> getItemsOnContainer(ContainerType container) { - Set<ItemType> result = new LinkedHashSet<ItemType>(); - for (Map.Entry<ItemType, ContainerType> entry : itemToContainer.entrySet()) { - if (container.equals(entry.getValue())) { - result.add(entry.getKey()); - } - } - return result; - } - - private int getNumItems() { - return itemToContainer.size(); - } - - @SuppressWarnings("unchecked") - private ContainerType nullContainer() { - return (ContainerType) NULL; // relies on erasure - } - - private Location nullLocation() { - return NULL_LOCATION; - } - - private ContainerType toNonNullContainer(ContainerType val) { - return (val != null) ? val : nullContainer(); - } - - private Location toNonNullLocation(Location val) { - return (val != null) ? val : nullLocation(); - } - - private boolean isNull(Object val) { - return val == NULL || val == NULL_LOCATION; - } - - // TODO Move to utils; or stop AbstractLocation from removing things from the map! - public static <K,V> Map<K,V> newHashMap(K k, V v) { - Map<K,V> result = Maps.newLinkedHashMap(); - result.put(k, v); - return result; - } - - public static double sum(Collection<? extends Number> values) { - double total = 0; - for (Number d : values) { - total += d.doubleValue(); - } - return total; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunModel.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunModel.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunModel.java deleted file mode 100644 index a181ae4..0000000 --- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunModel.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.followthesun; - -import java.util.Map; -import java.util.Set; - -import org.apache.brooklyn.api.location.Location; - -/** - * Captures the state of items, containers and locations for the purpose of moving items around - * to minimise latency. For consumption by a {@link FollowTheSunStrategy}. - */ -public interface FollowTheSunModel<ContainerType, ItemType> { - - // Attributes of the pool. - public String getName(); - - // Attributes of containers and items. - public String getName(ItemType item); - public Set<ItemType> getItems(); - public Map<ItemType, Map<Location, Double>> getDirectSendsToItemByLocation(); - public Location getItemLocation(ItemType item); - public ContainerType getItemContainer(ItemType item); - public Location getContainerLocation(ContainerType container); - public boolean hasActiveMigration(ItemType item); - public Set<ContainerType> getAvailableContainersFor(ItemType item, Location location); - public boolean isItemMoveable(ItemType item); - public boolean isItemAllowedIn(ItemType item, Location location); - - // Mutators for keeping the model in-sync with the observed world - public void onContainerAdded(ContainerType container, Location location); - public void onContainerRemoved(ContainerType container); - public void onContainerLocationUpdated(ContainerType container, Location location); - - public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable); - public void onItemRemoved(ItemType item); - public void onItemUsageUpdated(ItemType item, Map<? extends ItemType, Double> newValues); - public void onItemMoved(ItemType item, ContainerType newContainer); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunParameters.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunParameters.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunParameters.java deleted file mode 100644 index 2b2d9af..0000000 --- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunParameters.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.followthesun; - -import java.util.LinkedHashSet; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.api.location.Location; - -public class FollowTheSunParameters { - - private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunParameters.class); - - private FollowTheSunParameters() {} - - /** trigger for moving segment X from geo A to geo B: - * where x is total number of requests submitted in X across the CDM network, - * and x_A is number of reqs from geo A, with A the most prolific geography - * (arbitrarily chosen in case of ties so recommended to choose at least a small percent_majority or delta_above_percent_majority, in addition to this field); - * this parameter T defines a number such that x_A > T*x in order for X to be migrated to A - * (but see also DELTA_ABOVE_PERCENT_TOTAL, below) */ - public double triggerPercentTotal = 0.3; - /** fields as above, and T as above, - * this parameter T' defines a number such that x_A > T*x + T' in order for X to be migrated to A */ - public double triggerDeltaAbovePercentTotal = 0; - /** fields as above, - * this parameter T defines a number such that x_A > T in order for X to be migrated to A */ - public double triggerAbsoluteTotal = 2; - - /** fields as above, with X_B the number from a different geography B, - * where A and B are the two most prolific requesters of X, and X_A >= X_B; - * this parameter T defines a number such that x_A-x_B > T*x in order for X to be migrated to A */ - public double triggerPercentMajority = 0.2; - /** as corresponding majority and total fields, with x_A-x_B on the LHS of inequality */ - public double triggerDeltaAbovePercentMajority = 1; - /** as corresponding majority and total fields, with x_A-x_B on the LHS of inequality */ - public double triggerAbsoluteMajority = 4; - - /** a list of excluded locations */ - public Set<Location> excludedLocations = new LinkedHashSet<Location>(); - - public static FollowTheSunParameters newDefault() { - return new FollowTheSunParameters(); - } - - private static double parseDouble(String text, double defaultValue) { - try { - double d = Double.parseDouble(text); - if (!Double.isNaN(d)) return d; - } catch (Exception e) { - LOG.warn("Illegal double value '"+text+"', using default "+defaultValue+": "+e, e); - } - return defaultValue; - } - - private static String[] parseCommaSeparatedList(String csv) { - if (csv==null || csv.trim().length()==0) return new String[0]; - return csv.split(","); - } - - public boolean isTriggered(double highest, double total, double nextHighest, double current) { - if (highest <= current) return false; - if (highest < total*triggerPercentTotal + triggerDeltaAbovePercentTotal) return false; - if (highest < triggerAbsoluteTotal) return false; - //TODO more params about nextHighest vs current - if (highest-current < total*triggerPercentMajority + triggerDeltaAbovePercentMajority) return false; - if (highest-current < triggerAbsoluteMajority) return false; - return true; - } - - public String toString() { - return "Inter-geography policy params: percentTotal="+triggerPercentTotal+"; deltaAbovePercentTotal="+triggerDeltaAbovePercentTotal+ - "; absoluteTotal="+triggerAbsoluteTotal+"; percentMajority="+triggerPercentMajority+ - "; deltaAbovePercentMajority="+triggerDeltaAbovePercentMajority+"; absoluteMajority="+triggerAbsoluteMajority; - - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPolicy.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPolicy.java deleted file mode 100644 index b76682c..0000000 --- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPolicy.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.followthesun; - -import static brooklyn.util.JavaGroovyEquivalents.elvis; -import static brooklyn.util.JavaGroovyEquivalents.groovyTruth; -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.basic.EntityLocal; -import org.apache.brooklyn.api.event.AttributeSensor; -import org.apache.brooklyn.api.event.Sensor; -import org.apache.brooklyn.api.event.SensorEvent; -import org.apache.brooklyn.api.event.SensorEventListener; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.location.MachineProvisioningLocation; -import org.apache.brooklyn.core.policy.basic.AbstractPolicy; -import org.apache.brooklyn.core.util.flags.SetFromFlag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.Attributes; -import brooklyn.policy.followthesun.FollowTheSunPool.ContainerItemPair; -import brooklyn.policy.loadbalancing.Movable; -import brooklyn.util.collections.MutableMap; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - - // removed from catalog because it cannot currently be configured via catalog mechanisms - - // PolicySpec.create fails due to no no-arg constructor - // TODO make model and parameters things which can be initialized from config then reinstate in catalog -//@Catalog(name="Follow the Sun", description="Policy for moving \"work\" around to follow the demand; " -// + "the work can be any \"Movable\" entity") -public class FollowTheSunPolicy extends AbstractPolicy { - - private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunPolicy.class); - - public static final String NAME = "Follow the Sun (Inter-Geography Latency Optimization)"; - - @SetFromFlag(defaultVal="100") - private long minPeriodBetweenExecs; - - @SetFromFlag - private Function<Entity, Location> locationFinder; - - private final AttributeSensor<Map<? extends Movable, Double>> itemUsageMetric; - private final FollowTheSunModel<Entity, Movable> model; - private final FollowTheSunStrategy<Entity, Movable> strategy; - private final FollowTheSunParameters parameters; - - private FollowTheSunPool poolEntity; - - private volatile ScheduledExecutorService executor; - private final AtomicBoolean executorQueued = new AtomicBoolean(false); - private volatile long executorTime = 0; - private boolean loggedConstraintsIgnored = false; - - private final Function<Entity, Location> defaultLocationFinder = new Function<Entity, Location>() { - public Location apply(Entity e) { - Collection<Location> locs = e.getLocations(); - if (locs.isEmpty()) return null; - Location contender = Iterables.get(locs, 0); - while (contender.getParent() != null && !(contender instanceof MachineProvisioningLocation)) { - contender = contender.getParent(); - } - return contender; - } - }; - - private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() { - @Override - public void onEvent(SensorEvent<Object> event) { - if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", FollowTheSunPolicy.this, event); - Entity source = event.getSource(); - Object value = event.getValue(); - Sensor<?> sensor = event.getSensor(); - - if (sensor.equals(itemUsageMetric)) { - onItemMetricUpdated((Movable)source, (Map<? extends Movable, Double>) value, true); - } else if (sensor.equals(Attributes.LOCATION_CHANGED)) { - onContainerLocationUpdated(source, true); - } else if (sensor.equals(FollowTheSunPool.CONTAINER_ADDED)) { - onContainerAdded((Entity) value, true); - } else if (sensor.equals(FollowTheSunPool.CONTAINER_REMOVED)) { - onContainerRemoved((Entity) value, true); - } else if (sensor.equals(FollowTheSunPool.ITEM_ADDED)) { - onItemAdded((Movable) value, true); - } else if (sensor.equals(FollowTheSunPool.ITEM_REMOVED)) { - onItemRemoved((Movable) value, true); - } else if (sensor.equals(FollowTheSunPool.ITEM_MOVED)) { - ContainerItemPair pair = (ContainerItemPair) value; - onItemMoved((Movable)pair.item, pair.container, true); - } - } - }; - - // FIXME parameters: use a more groovy way of doing it, that's consistent with other policies/entities? - public FollowTheSunPolicy(AttributeSensor itemUsageMetric, - FollowTheSunModel<Entity, Movable> model, FollowTheSunParameters parameters) { - this(MutableMap.of(), itemUsageMetric, model, parameters); - } - - public FollowTheSunPolicy(Map props, AttributeSensor itemUsageMetric, - FollowTheSunModel<Entity, Movable> model, FollowTheSunParameters parameters) { - super(props); - this.itemUsageMetric = itemUsageMetric; - this.model = model; - this.parameters = parameters; - this.strategy = new FollowTheSunStrategy<Entity, Movable>(model, parameters); // TODO: extract interface, inject impl - this.locationFinder = elvis(locationFinder, defaultLocationFinder); - - // TODO Should re-use the execution manager's thread pool, somehow - executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory()); - } - - @Override - public void setEntity(EntityLocal entity) { - checkArgument(entity instanceof FollowTheSunPool, "Provided entity must be a FollowTheSunPool"); - super.setEntity(entity); - this.poolEntity = (FollowTheSunPool) entity; - - // Detect when containers are added to or removed from the pool. - subscribe(poolEntity, FollowTheSunPool.CONTAINER_ADDED, eventHandler); - subscribe(poolEntity, FollowTheSunPool.CONTAINER_REMOVED, eventHandler); - subscribe(poolEntity, FollowTheSunPool.ITEM_ADDED, eventHandler); - subscribe(poolEntity, FollowTheSunPool.ITEM_REMOVED, eventHandler); - subscribe(poolEntity, FollowTheSunPool.ITEM_MOVED, eventHandler); - - // Take heed of any extant containers. - for (Entity container : poolEntity.getContainerGroup().getMembers()) { - onContainerAdded(container, false); - } - for (Entity item : poolEntity.getItemGroup().getMembers()) { - onItemAdded((Movable)item, false); - } - - scheduleLatencyReductionJig(); - } - - @Override - public void suspend() { - // TODO unsubscribe from everything? And resubscribe on resume? - super.suspend(); - if (executor != null) executor.shutdownNow(); - executorQueued.set(false); - } - - @Override - public void resume() { - super.resume(); - executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory()); - executorTime = 0; - executorQueued.set(false); - } - - private ThreadFactory newThreadFactory() { - return new ThreadFactoryBuilder() - .setNameFormat("brooklyn-followthesunpolicy-%d") - .build(); - } - - private void scheduleLatencyReductionJig() { - if (isRunning() && executorQueued.compareAndSet(false, true)) { - long now = System.currentTimeMillis(); - long delay = Math.max(0, (executorTime + minPeriodBetweenExecs) - now); - - executor.schedule(new Runnable() { - public void run() { - try { - executorTime = System.currentTimeMillis(); - executorQueued.set(false); - - if (LOG.isTraceEnabled()) LOG.trace("{} executing follow-the-sun migration-strategy", this); - strategy.rebalance(); - - } catch (RuntimeException e) { - if (isRunning()) { - LOG.error("Error during latency-reduction-jig", e); - } else { - LOG.debug("Error during latency-reduction-jig, but no longer running", e); - } - } - }}, - delay, - TimeUnit.MILLISECONDS); - } - } - - private void onContainerAdded(Entity container, boolean rebalanceNow) { - subscribe(container, Attributes.LOCATION_CHANGED, eventHandler); - Location location = locationFinder.apply(container); - - if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of container {} in location {}", new Object[] {this, container, location}); - model.onContainerAdded(container, location); - - if (rebalanceNow) scheduleLatencyReductionJig(); - } - - private void onContainerRemoved(Entity container, boolean rebalanceNow) { - if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of container {}", this, container); - model.onContainerRemoved(container); - if (rebalanceNow) scheduleLatencyReductionJig(); - } - - private void onItemAdded(Movable item, boolean rebalanceNow) { - Entity parentContainer = (Entity) item.getAttribute(Movable.CONTAINER); - - if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of item {} in container {}", new Object[] {this, item, parentContainer}); - - subscribe(item, itemUsageMetric, eventHandler); - - // Update the model, including the current metric value (if any). - Map<? extends Movable, Double> currentValue = item.getAttribute(itemUsageMetric); - boolean immovable = (Boolean)elvis(item.getConfig(Movable.IMMOVABLE), false); - model.onItemAdded(item, parentContainer, immovable); - - if (currentValue != null) { - model.onItemUsageUpdated(item, currentValue); - } - - if (rebalanceNow) scheduleLatencyReductionJig(); - } - - private void onItemRemoved(Movable item, boolean rebalanceNow) { - if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of item {}", this, item); - unsubscribe(item); - model.onItemRemoved(item); - if (rebalanceNow) scheduleLatencyReductionJig(); - } - - private void onItemMoved(Movable item, Entity parentContainer, boolean rebalanceNow) { - if (LOG.isTraceEnabled()) LOG.trace("{} recording moving of item {} to {}", new Object[] {this, item, parentContainer}); - model.onItemMoved(item, parentContainer); - if (rebalanceNow) scheduleLatencyReductionJig(); - } - - private void onContainerLocationUpdated(Entity container, boolean rebalanceNow) { - Location location = locationFinder.apply(container); - if (LOG.isTraceEnabled()) LOG.trace("{} recording location for container {}, new value {}", new Object[] {this, container, location}); - model.onContainerLocationUpdated(container, location); - if (rebalanceNow) scheduleLatencyReductionJig(); - } - - private void onItemMetricUpdated(Movable item, Map<? extends Movable, Double> newValues, boolean rebalanceNow) { - if (LOG.isTraceEnabled()) LOG.trace("{} recording usage update for item {}, new value {}", new Object[] {this, item, newValues}); - model.onItemUsageUpdated(item, newValues); - if (rebalanceNow) scheduleLatencyReductionJig(); - } - - @Override - public String toString() { - return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" : ""); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPool.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPool.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPool.java deleted file mode 100644 index 402eeef..0000000 --- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPool.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.followthesun; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Serializable; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; - -import brooklyn.entity.trait.Resizable; -import brooklyn.event.basic.BasicNotificationSensor; - -@ImplementedBy(FollowTheSunPoolImpl.class) -public interface FollowTheSunPool extends Entity, Resizable { - - // FIXME Remove duplication from BalanceableWorkerPool? - - // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing - // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`. - - /** Encapsulates an item and a container; emitted by sensors. - */ - public static class ContainerItemPair implements Serializable { - private static final long serialVersionUID = 1L; - public final Entity container; - public final Entity item; - - public ContainerItemPair(Entity container, Entity item) { - this.container = container; - this.item = checkNotNull(item); - } - - @Override - public String toString() { - return ""+item+" @ "+container; - } - } - - // Pool constituent notifications. - public static BasicNotificationSensor<Entity> CONTAINER_ADDED = new BasicNotificationSensor<Entity>( - Entity.class, "followthesun.container.added", "Container added"); - public static BasicNotificationSensor<Entity> CONTAINER_REMOVED = new BasicNotificationSensor<Entity>( - Entity.class, "followthesun.container.removed", "Container removed"); - public static BasicNotificationSensor<Entity> ITEM_ADDED = new BasicNotificationSensor<Entity>( - Entity.class, "followthesun.item.added", "Item added"); - public static BasicNotificationSensor<Entity> ITEM_REMOVED = new BasicNotificationSensor<Entity>( - Entity.class, "followthesun.item.removed", "Item removed"); - public static BasicNotificationSensor<ContainerItemPair> ITEM_MOVED = new BasicNotificationSensor<ContainerItemPair>( - ContainerItemPair.class, "followthesun.item.moved", "Item moved to the given container"); - - public void setContents(Group containerGroup, Group itemGroup); - - public Group getContainerGroup(); - - public Group getItemGroup(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java deleted file mode 100644 index 4d74441..0000000 --- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.followthesun; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.event.Sensor; -import org.apache.brooklyn.api.event.SensorEvent; -import org.apache.brooklyn.api.event.SensorEventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.AbstractEntity; -import brooklyn.entity.basic.AbstractGroup; -import brooklyn.entity.trait.Resizable; -import brooklyn.entity.trait.Startable; -import brooklyn.policy.loadbalancing.Movable; - -public class FollowTheSunPoolImpl extends AbstractEntity implements FollowTheSunPool { - - // FIXME Remove duplication from BalanceableWorkerPool? - - // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing - // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`. - - private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunPool.class); - - private Group containerGroup; - private Group itemGroup; - - private final Set<Entity> containers = Collections.synchronizedSet(new HashSet<Entity>()); - private final Set<Entity> items = Collections.synchronizedSet(new HashSet<Entity>()); - - private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() { - @Override - public void onEvent(SensorEvent<Object> event) { - if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", FollowTheSunPoolImpl.this, event); - Entity source = event.getSource(); - Object value = event.getValue(); - Sensor sensor = event.getSensor(); - - if (sensor.equals(AbstractGroup.MEMBER_ADDED)) { - if (source.equals(containerGroup)) { - onContainerAdded((Entity) value); - } else if (source.equals(itemGroup)) { - onItemAdded((Entity)value); - } else { - throw new IllegalStateException("unexpected event source="+source); - } - } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) { - if (source.equals(containerGroup)) { - onContainerRemoved((Entity) value); - } else if (source.equals(itemGroup)) { - onItemRemoved((Entity) value); - } else { - throw new IllegalStateException("unexpected event source="+source); - } - } else if (sensor.equals(Startable.SERVICE_UP)) { - // TODO What if start has failed? Is there a sensor to indicate that? - if ((Boolean)value) { - onContainerUp(source); - } else { - onContainerDown(source); - } - } else if (sensor.equals(Movable.CONTAINER)) { - onItemMoved(source, (Entity) value); - } else { - throw new IllegalStateException("Unhandled event type "+sensor+": "+event); - } - } - }; - - public FollowTheSunPoolImpl() { - } - - @Override - public void setContents(Group containerGroup, Group itemGroup) { - this.containerGroup = containerGroup; - this.itemGroup = itemGroup; - subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler); - subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler); - subscribe(itemGroup, AbstractGroup.MEMBER_ADDED, eventHandler); - subscribe(itemGroup, AbstractGroup.MEMBER_REMOVED, eventHandler); - - // Process extant containers and items - for (Entity existingContainer : containerGroup.getMembers()) { - onContainerAdded(existingContainer); - } - for (Entity existingItem : itemGroup.getMembers()) { - onItemAdded((Entity)existingItem); - } - } - - @Override - public Group getContainerGroup() { - return containerGroup; - } - - @Override - public Group getItemGroup() { - return itemGroup; - } - - @Override - public Integer getCurrentSize() { - return containerGroup.getCurrentSize(); - } - - @Override - public Integer resize(Integer desiredSize) { - if (containerGroup instanceof Resizable) return ((Resizable) containerGroup).resize(desiredSize); - - throw new UnsupportedOperationException("Container group is not resizable"); - } - - - private void onContainerAdded(Entity newContainer) { - subscribe(newContainer, Startable.SERVICE_UP, eventHandler); - if (!(newContainer instanceof Startable) || Boolean.TRUE.equals(newContainer.getAttribute(Startable.SERVICE_UP))) { - onContainerUp(newContainer); - } - } - - private void onContainerUp(Entity newContainer) { - if (containers.add(newContainer)) { - emit(CONTAINER_ADDED, newContainer); - } - } - - private void onContainerDown(Entity oldContainer) { - if (containers.remove(oldContainer)) { - emit(CONTAINER_REMOVED, oldContainer); - } - } - - private void onContainerRemoved(Entity oldContainer) { - unsubscribe(oldContainer); - onContainerDown(oldContainer); - } - - private void onItemAdded(Entity item) { - if (items.add(item)) { - subscribe(item, Movable.CONTAINER, eventHandler); - emit(ITEM_ADDED, item); - } - } - - private void onItemRemoved(Entity item) { - if (items.remove(item)) { - unsubscribe(item); - emit(ITEM_REMOVED, item); - } - } - - private void onItemMoved(Entity item, Entity container) { - emit(ITEM_MOVED, new ContainerItemPair(container, item)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunStrategy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunStrategy.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunStrategy.java deleted file mode 100644 index e8ca636..0000000 --- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunStrategy.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.followthesun; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.policy.loadbalancing.Movable; - -import com.google.common.collect.Iterables; - -// TODO: extract interface -public class FollowTheSunStrategy<ContainerType extends Entity, ItemType extends Movable> { - - // This is a modified version of the InterGeographyLatencyPolicy (aka Follow-The-Sun) policy from Monterey v3. - - // TODO location constraints - - private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunStrategy.class); - - private final FollowTheSunParameters parameters; - private final FollowTheSunModel<ContainerType,ItemType> model; - private final String name; - - public FollowTheSunStrategy(FollowTheSunModel<ContainerType,ItemType> model, FollowTheSunParameters parameters) { - this.model = model; - this.parameters = parameters; - this.name = model.getName(); - } - - public void rebalance() { - try { - Set<ItemType> items = model.getItems(); - Map<ItemType, Map<Location, Double>> directSendsToItemByLocation = model.getDirectSendsToItemByLocation(); - - for (ItemType item : items) { - String itemName = model.getName(item); - Location activeLocation = model.getItemLocation(item); - ContainerType activeContainer = model.getItemContainer(item); - Map<Location, Double> sendsByLocation = directSendsToItemByLocation.get(item); - if (sendsByLocation == null) sendsByLocation = Collections.emptyMap(); - - if (parameters.excludedLocations.contains(activeLocation)) { - if (LOG.isTraceEnabled()) LOG.trace("Ignoring segment {} as it is in {}", itemName, activeLocation); - continue; - } - if (!model.isItemMoveable(item)) { - if (LOG.isDebugEnabled()) LOG.debug("POLICY {} skipping any migration of {}, it is not moveable", name, itemName); - continue; - } - if (model.hasActiveMigration(item)) { - LOG.info("POLICY {} skipping any migration of {}, it is involved in an active migration already", name, itemName); - continue; - } - - double total = DefaultFollowTheSunModel.sum(sendsByLocation.values()); - - if (LOG.isTraceEnabled()) LOG.trace("POLICY {} detected {} msgs/sec in {}, split up as: {}", new Object[] {name, total, itemName, sendsByLocation}); - - Double current = sendsByLocation.get(activeLocation); - if (current == null) current=0d; - List<WeightedObject<Location>> locationsWtd = new ArrayList<WeightedObject<Location>>(); - if (total > 0) { - for (Map.Entry<Location, Double> entry : sendsByLocation.entrySet()) { - Location l = entry.getKey(); - Double d = entry.getValue(); - if (d > current) locationsWtd.add(new WeightedObject<Location>(l, d)); - } - } - Collections.sort(locationsWtd); - Collections.reverse(locationsWtd); - - double highestMsgRate = -1; - Location highestLocation = null; - ContainerType optimalContainerInHighest = null; - while (!locationsWtd.isEmpty()) { - WeightedObject<Location> weightedObject = locationsWtd.remove(0); - highestMsgRate = weightedObject.getWeight(); - highestLocation = weightedObject.getObject(); - optimalContainerInHighest = findOptimal(model.getAvailableContainersFor(item, highestLocation)); - if (optimalContainerInHighest != null) { - break; - } - } - if (optimalContainerInHighest == null) { - if (LOG.isDebugEnabled()) LOG.debug("POLICY {} detected {} is already in optimal permitted location ({} of {} msgs/sec)", new Object[] {name, itemName, highestMsgRate, total}); - continue; - } - - double nextHighestMsgRate = -1; - ContainerType optimalContainerInNextHighest = null; - while (!locationsWtd.isEmpty()) { - WeightedObject<Location> weightedObject = locationsWtd.remove(0); - nextHighestMsgRate = weightedObject.getWeight(); - Location nextHighestLocation = weightedObject.getObject(); - optimalContainerInNextHighest = findOptimal(model.getAvailableContainersFor(item, nextHighestLocation)); - if (optimalContainerInNextHighest != null) { - break; - } - } - if (optimalContainerInNextHighest == null) { - nextHighestMsgRate = current; - } - - if (parameters.isTriggered(highestMsgRate, total, nextHighestMsgRate, current)) { - LOG.info("POLICY "+name+" detected "+itemName+" should be in location "+highestLocation+" on "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec), migrating"); - try { - if (activeContainer.equals(optimalContainerInHighest)) { - //shouldn't happen - LOG.warn("POLICY "+name+" detected "+itemName+" should move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec) but it is already there with "+current+" msgs/sec"); - } else { - item.move(optimalContainerInHighest); - model.onItemMoved(item, optimalContainerInHighest); - } - } catch (Exception e) { - LOG.warn("POLICY "+name+" detected "+itemName+" should be on "+optimalContainerInHighest+", but can't move it: "+e, e); - } - } else { - if (LOG.isTraceEnabled()) LOG.trace("POLICY "+name+" detected "+itemName+" need not move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec not much better than "+current+" at "+activeContainer+")"); - } - } - } catch (Exception e) { - LOG.warn("Error in policy "+name+" (ignoring): "+e, e); - } - } - - private ContainerType findOptimal(Collection<ContainerType> contenders) { - /* - * TODO should choose the least loaded mediator. Currently chooses first available, and relies - * on a load-balancer to move it again; would be good if these could share decision code so move - * it to the right place immediately. e.g. - * policyUtil.findLeastLoadedMediator(nodesInLocation); - */ - return (contenders.isEmpty() ? null : Iterables.get(contenders, 0)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/WeightedObject.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/followthesun/WeightedObject.java b/policy/src/main/java/brooklyn/policy/followthesun/WeightedObject.java deleted file mode 100644 index b1d506d..0000000 --- a/policy/src/main/java/brooklyn/policy/followthesun/WeightedObject.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.followthesun; - -public class WeightedObject<T> implements Comparable<WeightedObject<T>>{ - - final T object; - final double weight; - - public WeightedObject(T obj, double weight) { - this.object = obj; - this.weight = weight; - } - - public T getObject() { - return object; - } - - public double getWeight() { - return weight; - } - - /** - * Note that equals and compareTo are not consistent: x.compareTo(y)==0 iff x.equals(y) is - * highly recommended in Java, but is not required. This can make TreeSet etc behave poorly... - */ - public int compareTo(WeightedObject<T> o) { - double diff = o.getWeight() - weight; - if (diff>0.0000000000000001) return -1; - if (diff<-0.0000000000000001) return 1; - return 0; - } - - @Override - /** true irrespective of weight */ - public boolean equals(Object obj) { - if (!(obj instanceof WeightedObject<?>)) return false; - if (getObject()==null) { - return ((WeightedObject<?>)obj).getObject() == null; - } else { - return getObject().equals( ((WeightedObject<?>)obj).getObject() ); - } - } - - @Override - public int hashCode() { - if (getObject()==null) return 234519078; - return getObject().hashCode(); - } - - @Override - public String toString() { - return ""+getObject()+"["+getWeight()+"]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java deleted file mode 100644 index 9d8c58f..0000000 --- a/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.ha; - -import static brooklyn.util.time.Time.makeTimeStringRounded; - -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.brooklyn.api.entity.basic.EntityLocal; -import org.apache.brooklyn.api.event.Sensor; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.core.policy.basic.AbstractPolicy; -import org.apache.brooklyn.core.util.flags.SetFromFlag; -import org.apache.brooklyn.core.util.task.BasicTask; -import org.apache.brooklyn.core.util.task.ScheduledTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.policy.ha.HASensors.FailureDescriptor; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.reflect.TypeToken; - -public abstract class AbstractFailureDetector extends AbstractPolicy { - - // TODO Remove duplication from ServiceFailureDetector, particularly for the stabilisation delays. - - private static final Logger LOG = LoggerFactory.getLogger(AbstractFailureDetector.class); - - private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100; - - public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newDurationConfigKey( - "failureDetector.pollPeriod", "", Duration.ONE_SECOND); - - @SetFromFlag("failedStabilizationDelay") - public static final ConfigKey<Duration> FAILED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey( - "failureDetector.serviceFailedStabilizationDelay", - "Time period for which the health check consistently fails " - + "(e.g. doesn't report failed-ok-faled) before concluding failure.", - Duration.ZERO); - - @SetFromFlag("recoveredStabilizationDelay") - public static final ConfigKey<Duration> RECOVERED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey( - "failureDetector.serviceRecoveredStabilizationDelay", - "Time period for which the health check succeeds continiually " + - "(e.g. doesn't report ok-failed-ok) before concluding recovered", - Duration.ZERO); - - @SuppressWarnings("serial") - public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_FAILED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {}, - "failureDetector.sensor.fail", "A sensor which will indicate failure when set", HASensors.ENTITY_FAILED); - - @SuppressWarnings("serial") - public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_RECOVERED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {}, - "failureDetector.sensor.recover", "A sensor which will indicate recovery from failure when set", HASensors.ENTITY_RECOVERED); - - public interface CalculatedStatus { - boolean isHealthy(); - String getDescription(); - } - - private final class PublishJob implements Runnable { - @Override public void run() { - try { - executorTime = System.currentTimeMillis(); - executorQueued.set(false); - - publishNow(); - - } catch (Exception e) { - if (isRunning()) { - LOG.error("Problem resizing: "+e, e); - } else { - if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but no longer running: "+e, e); - } - } catch (Throwable t) { - LOG.error("Problem in service-failure-detector: "+t, t); - throw Exceptions.propagate(t); - } - } - } - - private final class HealthPoller implements Runnable { - @Override - public void run() { - checkHealth(); - } - } - - private final class HealthPollingTaskFactory implements Callable<Task<?>> { - @Override - public Task<?> call() { - BasicTask<Void> task = new BasicTask<Void>(new HealthPoller()); - BrooklynTaskTags.setTransient(task); - return task; - } - } - - protected static class BasicCalculatedStatus implements CalculatedStatus { - private boolean healthy; - private String description; - - public BasicCalculatedStatus(boolean healthy, String description) { - this.healthy = healthy; - this.description = description; - } - - @Override - public boolean isHealthy() { - return healthy; - } - - @Override - public String getDescription() { - return description; - } - } - - public enum LastPublished { - NONE, - FAILED, - RECOVERED; - } - - protected final AtomicReference<Long> stateLastGood = new AtomicReference<Long>(); - protected final AtomicReference<Long> stateLastFail = new AtomicReference<Long>(); - - protected Long currentFailureStartTime = null; - protected Long currentRecoveryStartTime = null; - - protected LastPublished lastPublished = LastPublished.NONE; - - private final AtomicBoolean executorQueued = new AtomicBoolean(false); - private volatile long executorTime = 0; - - private Callable<Task<?>> pollingTaskFactory = new HealthPollingTaskFactory(); - - private Task<?> scheduledTask; - - protected abstract CalculatedStatus calculateStatus(); - - @Override - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - - if (isRunning()) { - doStartPolling(); - } - } - - @Override - public void suspend() { - scheduledTask.cancel(true); - super.suspend(); - } - - @Override - public void resume() { - currentFailureStartTime = null; - currentRecoveryStartTime = null; - lastPublished = LastPublished.NONE; - executorQueued.set(false); - executorTime = 0; - - super.resume(); - doStartPolling(); - } - - @SuppressWarnings("unchecked") - protected void doStartPolling() { - if (scheduledTask == null || scheduledTask.isDone()) { - ScheduledTask task = new ScheduledTask(MutableMap.of("period", getPollPeriod(), "displayName", getTaskName()), pollingTaskFactory); - scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task); - } - } - - private String getTaskName() { - return getDisplayName(); - } - - protected Duration getPollPeriod() { - return getConfig(POLL_PERIOD); - } - - protected Duration getFailedStabilizationDelay() { - return getConfig(FAILED_STABILIZATION_DELAY); - } - - protected Duration getRecoveredStabilizationDelay() { - return getConfig(RECOVERED_STABILIZATION_DELAY); - } - - protected Sensor<FailureDescriptor> getSensorFailed() { - return getConfig(SENSOR_FAILED); - } - - protected Sensor<FailureDescriptor> getSensorRecovered() { - return getConfig(SENSOR_RECOVERED); - } - - private synchronized void checkHealth() { - CalculatedStatus status = calculateStatus(); - boolean healthy = status.isHealthy(); - long now = System.currentTimeMillis(); - - if (healthy) { - stateLastGood.set(now); - if (lastPublished == LastPublished.FAILED) { - if (currentRecoveryStartTime == null) { - LOG.info("{} check for {}, now recovering: {}", new Object[] {this, entity, getDescription(status)}); - currentRecoveryStartTime = now; - schedulePublish(); - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing recovering: {}", new Object[] {this, entity, getDescription(status)}); - } - } else { - if (currentFailureStartTime != null) { - LOG.info("{} check for {}, now healthy: {}", new Object[] {this, entity, getDescription(status)}); - currentFailureStartTime = null; - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still healthy: {}", new Object[] {this, entity, getDescription(status)}); - } - } - } else { - stateLastFail.set(now); - if (lastPublished != LastPublished.FAILED) { - if (currentFailureStartTime == null) { - LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)}); - currentFailureStartTime = now; - schedulePublish(); - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing failing: {}", new Object[] {this, entity, getDescription(status)}); - } - } else { - if (currentRecoveryStartTime != null) { - LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)}); - currentRecoveryStartTime = null; - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still failed: {}", new Object[] {this, entity, getDescription(status)}); - } - } - } - } - - protected void schedulePublish() { - schedulePublish(0); - } - - @SuppressWarnings("unchecked") - protected void schedulePublish(long delay) { - if (isRunning() && executorQueued.compareAndSet(false, true)) { - long now = System.currentTimeMillis(); - delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now)); - if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay); - - Runnable job = new PublishJob(); - - ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask<Void>(job)); - ((EntityInternal)entity).getExecutionContext().submit(task); - } - } - - private synchronized void publishNow() { - if (!isRunning()) return; - - CalculatedStatus calculatedStatus = calculateStatus(); - boolean healthy = calculatedStatus.isHealthy(); - - Long lastUpTime = stateLastGood.get(); - Long lastDownTime = stateLastFail.get(); - long serviceFailedStabilizationDelay = getFailedStabilizationDelay().toMilliseconds(); - long serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay().toMilliseconds(); - long now = System.currentTimeMillis(); - - if (healthy) { - if (lastPublished == LastPublished.FAILED) { - // only publish if consistently up for serviceRecoveredStabilizationDelay - long currentRecoveryPeriod = getTimeDiff(now, currentRecoveryStartTime); - long sinceLastDownPeriod = getTimeDiff(now, lastDownTime); - if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay && sinceLastDownPeriod > serviceRecoveredStabilizationDelay) { - String description = getDescription(calculatedStatus); - LOG.warn("{} check for {}, publishing recovered: {}", new Object[] {this, entity, description}); - entity.emit(getSensorRecovered(), new HASensors.FailureDescriptor(entity, description)); - lastPublished = LastPublished.RECOVERED; - currentFailureStartTime = null; - } else { - long nextAttemptTime = Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, serviceRecoveredStabilizationDelay - sinceLastDownPeriod); - schedulePublish(nextAttemptTime); - } - } - } else { - if (lastPublished != LastPublished.FAILED) { - // only publish if consistently down for serviceFailedStabilizationDelay - long currentFailurePeriod = getTimeDiff(now, currentFailureStartTime); - long sinceLastUpPeriod = getTimeDiff(now, lastUpTime); - if (currentFailurePeriod > serviceFailedStabilizationDelay && sinceLastUpPeriod > serviceFailedStabilizationDelay) { - String description = getDescription(calculatedStatus); - LOG.warn("{} connectivity-check for {}, publishing failed: {}", new Object[] {this, entity, description}); - entity.emit(getSensorFailed(), new HASensors.FailureDescriptor(entity, description)); - lastPublished = LastPublished.FAILED; - currentRecoveryStartTime = null; - } else { - long nextAttemptTime = Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, serviceFailedStabilizationDelay - sinceLastUpPeriod); - schedulePublish(nextAttemptTime); - } - } - } - } - - protected String getDescription(CalculatedStatus status) { - Long lastUpTime = stateLastGood.get(); - Long lastDownTime = stateLastGood.get(); - Duration serviceFailedStabilizationDelay = getFailedStabilizationDelay(); - Duration serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay(); - - return String.format("%s; healthy=%s; timeNow=%s; lastUp=%s; lastDown=%s; lastPublished=%s; "+ - "currentFailurePeriod=%s; currentRecoveryPeriod=%s", - status.getDescription(), - status.isHealthy(), - Time.makeDateString(System.currentTimeMillis()), - (lastUpTime != null ? Time.makeDateString(lastUpTime) : "<never>"), - (lastDownTime != null ? Time.makeDateString(lastDownTime) : "<never>"), - lastPublished, - (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")", - (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")"); - } - - private long getTimeDiff(Long recent, Long previous) { - return (previous == null) ? recent : (recent - previous); - } - - private String getTimeStringSince(Long time) { - return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java deleted file mode 100644 index b347949..0000000 --- a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.policy.ha; - -import org.apache.brooklyn.api.entity.basic.EntityLocal; -import org.apache.brooklyn.api.event.Sensor; -import org.apache.brooklyn.api.event.SensorEvent; -import org.apache.brooklyn.api.event.SensorEventListener; -import org.apache.brooklyn.api.policy.Policy; -import org.apache.brooklyn.core.policy.basic.AbstractPolicy; -import org.apache.brooklyn.core.util.flags.SetFromFlag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.util.javalang.JavaClassNames; - -import com.google.common.base.Preconditions; - -public class ConditionalSuspendPolicy extends AbstractPolicy { - private static final Logger LOG = LoggerFactory.getLogger(ConditionalSuspendPolicy.class); - - @SetFromFlag("suppressSensor") - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static final ConfigKey<Sensor<?>> SUSPEND_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class, - "suppressSensor", "Sensor which will suppress the target policy", HASensors.CONNECTION_FAILED); - - @SetFromFlag("resetSensor") - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static final ConfigKey<Sensor<?>> RESUME_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class, - "resetSensor", "Resume target policy when this sensor is observed", HASensors.CONNECTION_RECOVERED); - - @SetFromFlag("target") - public static final ConfigKey<Object> SUSPEND_TARGET = ConfigKeys.newConfigKey(Object.class, - "target", "The target policy to suspend. Either direct reference or the value of the suspendTarget config on a policy from the same entity."); - - @Override - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - Object target = config().get(SUSPEND_TARGET); - Preconditions.checkNotNull(target, "Suspend target required"); - Preconditions.checkNotNull(getTargetPolicy(), "Can't find target policy set in " + SUSPEND_TARGET.getName() + ": " + target); - subscribe(); - uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+getConfig(SUSPEND_SENSOR).getName()+":"+getConfig(RESUME_SENSOR).getName(); - } - - private void subscribe() { - subscribe(entity, getConfig(SUSPEND_SENSOR), new SensorEventListener<Object>() { - @Override public void onEvent(final SensorEvent<Object> event) { - if (isRunning()) { - Policy target = getTargetPolicy(); - target.suspend(); - LOG.debug("Suspended policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue()); - } - } - - }); - subscribe(entity, getConfig(RESUME_SENSOR), new SensorEventListener<Object>() { - @Override public void onEvent(final SensorEvent<Object> event) { - if (isRunning()) { - Policy target = getTargetPolicy(); - target.resume(); - LOG.debug("Resumed policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue()); - } - } - }); - } - - private Policy getTargetPolicy() { - Object target = config().get(SUSPEND_TARGET); - if (target instanceof Policy) { - return (Policy)target; - } else if (target instanceof String) { - for (Policy policy : entity.getPolicies()) { - // No way to set config values for keys NOT declared in the policy, - // so must use displayName as a generally available config value. - if (target.equals(policy.getDisplayName()) || target.equals(policy.getClass().getName())) { - return policy; - } - } - } else { - throw new IllegalStateException("Unexpected type " + target.getClass() + " for target " + target); - } - return null; - } -}
