http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypePredicates.java ---------------------------------------------------------------------- diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypePredicates.java index 0000000,ebeccd7..2c02874 mode 000000,100644..100644 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypePredicates.java +++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypePredicates.java @@@ -1,0 -1,180 +1,257 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.brooklyn.core.typereg; + + import javax.annotation.Nullable; + -import org.apache.brooklyn.api.catalog.BrooklynCatalog; + import org.apache.brooklyn.api.entity.Application; + import org.apache.brooklyn.api.entity.Entity; + import org.apache.brooklyn.api.location.Location; + import org.apache.brooklyn.api.mgmt.ManagementContext; + import org.apache.brooklyn.api.policy.Policy; + import org.apache.brooklyn.api.typereg.RegisteredType; ++import org.apache.brooklyn.api.typereg.RegisteredTypeLoadingContext; + import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; ++import org.apache.brooklyn.util.collections.CollectionFunctionals; + + import com.google.common.base.Function; + import com.google.common.base.Predicate; + import com.google.common.base.Predicates; + + public class RegisteredTypePredicates { + + public static Predicate<RegisteredType> deprecated(final boolean deprecated) { + return new DeprecatedEqualTo(deprecated); + } + + private static class DeprecatedEqualTo implements Predicate<RegisteredType> { + private final boolean deprecated; + + public DeprecatedEqualTo(boolean deprecated) { + this.deprecated = deprecated; + } + @Override + public boolean apply(@Nullable RegisteredType item) { + return (item != null) && item.isDeprecated() == deprecated; + } + } + + public static Predicate<RegisteredType> disabled(boolean disabled) { + return new DisabledEqualTo(disabled); + } + + private static class DisabledEqualTo implements Predicate<RegisteredType> { + private final boolean disabled; + + public DisabledEqualTo(boolean disabled) { + this.disabled = disabled; + } + @Override + public boolean apply(@Nullable RegisteredType item) { + return (item != null) && item.isDisabled() == disabled; + } + } + + public static final Function<RegisteredType,String> ID_OF_ITEM_TRANSFORMER = new IdOfItemTransformer(); + + private static class IdOfItemTransformer implements Function<RegisteredType,String> { + @Override @Nullable + public String apply(@Nullable RegisteredType input) { + if (input==null) return null; + return input.getId(); + } + }; + + public static Predicate<RegisteredType> displayName(final Predicate<? super String> filter) { + return new DisplayNameMatches(filter); + } + + private static class DisplayNameMatches implements Predicate<RegisteredType> { + private final Predicate<? super String> filter; + + public DisplayNameMatches(Predicate<? super String> filter) { + this.filter = filter; + } + @Override + public boolean apply(@Nullable RegisteredType item) { + return (item != null) && filter.apply(item.getDisplayName()); + } + } + ++ public static Predicate<RegisteredType> symbolicName(final String name) { ++ return symbolicName(Predicates.equalTo(name)); ++ } + public static Predicate<RegisteredType> symbolicName(final Predicate<? super String> filter) { + return new SymbolicNameMatches(filter); + } + + private static class SymbolicNameMatches implements Predicate<RegisteredType> { + private final Predicate<? super String> filter; + + public SymbolicNameMatches(Predicate<? super String> filter) { + this.filter = filter; + } + @Override + public boolean apply(@Nullable RegisteredType item) { + return (item != null) && filter.apply(item.getSymbolicName()); + } + } + ++ public static Predicate<RegisteredType> version(final String name) { ++ return version(Predicates.equalTo(name)); ++ } ++ public static Predicate<RegisteredType> version(final Predicate<? super String> filter) { ++ return new VersionMatches(filter); ++ } ++ ++ private static class VersionMatches implements Predicate<RegisteredType> { ++ private final Predicate<? super String> filter; ++ ++ public VersionMatches(Predicate<? super String> filter) { ++ this.filter = filter; ++ } ++ @Override ++ public boolean apply(@Nullable RegisteredType item) { ++ return (item != null) && filter.apply(item.getVersion()); ++ } ++ } ++ ++ public static Predicate<RegisteredType> alias(final String alias) { ++ return aliases(CollectionFunctionals.any(Predicates.equalTo(alias))); ++ } ++ public static Predicate<RegisteredType> aliases(final Predicate<? super Iterable<String>> filter) { ++ return new AliasesMatch(filter); ++ } ++ ++ private static class AliasesMatch implements Predicate<RegisteredType> { ++ private final Predicate<? super Iterable<String>> filter; ++ ++ public AliasesMatch(Predicate<? super Iterable<String>> filter) { ++ this.filter = filter; ++ } ++ @Override ++ public boolean apply(@Nullable RegisteredType item) { ++ return (item != null) && filter.apply(item.getAliases()); ++ } ++ } ++ ++ public static Predicate<RegisteredType> tag(final Object tag) { ++ return tags(CollectionFunctionals.any(Predicates.equalTo(tag))); ++ } ++ public static Predicate<RegisteredType> tags(final Predicate<? super Iterable<Object>> filter) { ++ return new TagsMatch(filter); ++ } ++ ++ private static class TagsMatch implements Predicate<RegisteredType> { ++ private final Predicate<? super Iterable<Object>> filter; ++ ++ public TagsMatch(Predicate<? super Iterable<Object>> filter) { ++ this.filter = filter; ++ } ++ @Override ++ public boolean apply(@Nullable RegisteredType item) { ++ return (item != null) && filter.apply(item.getTags()); ++ } ++ } ++ + public static <T> Predicate<RegisteredType> anySuperType(final Predicate<Class<T>> filter) { + return new AnySuperTypeMatches(filter); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static Predicate<RegisteredType> subtypeOf(final Class<?> filter) { + // the assignableFrom predicate checks if this class is assignable from the subsequent *input*. + // in other words, we're checking if any input is a subtype of this class + return anySuperType((Predicate)Predicates.assignableFrom(filter)); + } + + private static class AnySuperTypeMatches implements Predicate<RegisteredType> { + private final Predicate<Class<?>> filter; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private AnySuperTypeMatches(Predicate filter) { + this.filter = filter; + } + @Override + public boolean apply(@Nullable RegisteredType item) { + if (item==null) return false; + return RegisteredTypes.isAnyTypeOrSuperSatisfying(item.getSuperTypes(), filter); + } + } + + public static final Predicate<RegisteredType> IS_APPLICATION = subtypeOf(Application.class); + public static final Predicate<RegisteredType> IS_ENTITY = subtypeOf(Entity.class); + public static final Predicate<RegisteredType> IS_LOCATION = subtypeOf(Location.class); + public static final Predicate<RegisteredType> IS_POLICY = subtypeOf(Policy.class); + + public static Predicate<RegisteredType> entitledToSee(final ManagementContext mgmt) { + return new EntitledToSee(mgmt); + } + + private static class EntitledToSee implements Predicate<RegisteredType> { + private final ManagementContext mgmt; + + public EntitledToSee(ManagementContext mgmt) { + this.mgmt = mgmt; + } + @Override + public boolean apply(@Nullable RegisteredType item) { + return (item != null) && + Entitlements.isEntitled(mgmt.getEntitlementManager(), Entitlements.SEE_CATALOG_ITEM, item.getId()); + } + } + + public static Predicate<RegisteredType> isBestVersion(final ManagementContext mgmt) { + return new IsBestVersion(mgmt); + } - + private static class IsBestVersion implements Predicate<RegisteredType> { + private final ManagementContext mgmt; + + public IsBestVersion(ManagementContext mgmt) { + this.mgmt = mgmt; + } + @Override + public boolean apply(@Nullable RegisteredType item) { + return isBestVersion(mgmt, item); + } + } - + public static boolean isBestVersion(ManagementContext mgmt, RegisteredType item) { - RegisteredType bestVersion = mgmt.getTypeRegistry().get(item.getSymbolicName(), BrooklynCatalog.DEFAULT_VERSION); - if (bestVersion==null) return false; - return (bestVersion.getVersion().equals(item.getVersion())); ++ if (item==null) return false; ++ Iterable<RegisteredType> matches = mgmt.getTypeRegistry().getMatching( ++ RegisteredTypePredicates.symbolicName(item.getSymbolicName()) ); ++ if (!matches.iterator().hasNext()) return false; ++ RegisteredType best = RegisteredTypes.getBestVersion(matches); ++ return (best.getVersion().equals(item.getVersion())); ++ } ++ ++ public static Predicate<RegisteredType> satisfies(RegisteredTypeLoadingContext context) { ++ return new SatisfiesContext(context); ++ } ++ private static class SatisfiesContext implements Predicate<RegisteredType> { ++ private final RegisteredTypeLoadingContext context; ++ ++ public SatisfiesContext(RegisteredTypeLoadingContext context) { ++ this.context = context; ++ } ++ @Override ++ public boolean apply(@Nullable RegisteredType item) { ++ return RegisteredTypes.tryValidate(item, context).isPresent(); ++ } + } + + }
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypes.java ---------------------------------------------------------------------- diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypes.java index 0000000,18f8f43..0c7b09b mode 000000,100644..100644 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypes.java +++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypes.java @@@ -1,0 -1,342 +1,426 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.brooklyn.core.typereg; + + import java.lang.reflect.Method; ++import java.util.Comparator; + import java.util.Iterator; + import java.util.Map; + import java.util.Set; + + import javax.annotation.Nullable; + + import org.apache.brooklyn.api.catalog.CatalogItem; + import org.apache.brooklyn.api.internal.AbstractBrooklynObjectSpec; + import org.apache.brooklyn.api.mgmt.ManagementContext; + import org.apache.brooklyn.api.objs.BrooklynObject; + import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry.RegisteredTypeKind; -import org.apache.brooklyn.api.typereg.OsgiBundleWithUrl; + import org.apache.brooklyn.api.typereg.RegisteredType; + import org.apache.brooklyn.api.typereg.RegisteredType.TypeImplementationPlan; + import org.apache.brooklyn.api.typereg.RegisteredTypeLoadingContext; + import org.apache.brooklyn.config.ConfigKey; + import org.apache.brooklyn.core.catalog.internal.CatalogUtils; + import org.apache.brooklyn.core.config.ConfigKeys; + import org.apache.brooklyn.core.objs.BrooklynObjectInternal; + import org.apache.brooklyn.core.typereg.JavaClassNameTypePlanTransformer.JavaClassNameTypeImplementationPlan; + import org.apache.brooklyn.util.exceptions.Exceptions; + import org.apache.brooklyn.util.guava.Maybe; ++import org.apache.brooklyn.util.guava.Maybe.Absent; ++import org.apache.brooklyn.util.text.NaturalOrderComparator; ++import org.apache.brooklyn.util.text.VersionComparator; + import org.apache.brooklyn.util.yaml.Yamls; + + import com.google.common.annotations.Beta; + import com.google.common.base.Function; + import com.google.common.base.Predicate; + import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; ++import com.google.common.collect.ComparisonChain; ++import com.google.common.collect.Ordering; + import com.google.common.reflect.TypeToken; + + /** + * Utility and preferred creation mechanisms for working with {@link RegisteredType} instances. + * <p> + * Use {@link #bean(String, String, TypeImplementationPlan, Class)} and {@link #spec(String, String, TypeImplementationPlan, Class)} + * to create {@link RegisteredType} instances. + * <p> + * See {@link #isSubtypeOf(RegisteredType, Class)} or {@link #isSubtypeOf(RegisteredType, RegisteredType)} to + * inspect the type hierarchy. + */ + public class RegisteredTypes { + + @SuppressWarnings("serial") + static ConfigKey<Class<?>> ACTUAL_JAVA_TYPE = ConfigKeys.newConfigKey(new TypeToken<Class<?>>() {}, "java.type.actual", + "The actual Java type which will be instantiated (bean) or pointed at (spec)"); + + /** @deprecated since it was introduced in 0.9.0; for backwards compatibility only, may be removed at any point */ + @Deprecated + static final Function<CatalogItem<?,?>,RegisteredType> CI_TO_RT = new Function<CatalogItem<?,?>, RegisteredType>() { + @Override + public RegisteredType apply(CatalogItem<?, ?> item) { + return of(item); + } + }; + + /** @deprecated since it was introduced in 0.9.0; for backwards compatibility only, may be removed at any point */ + @Deprecated + public static RegisteredType of(CatalogItem<?, ?> item) { + if (item==null) return null; + TypeImplementationPlan impl = null; + if (item.getPlanYaml()!=null) { + impl = new BasicTypeImplementationPlan(null, item.getPlanYaml()); + } else if (item.getJavaType()!=null) { + impl = new JavaClassNameTypeImplementationPlan(item.getJavaType()); + } else { + throw new IllegalStateException("Unsupported catalog item "+item+" when trying to create RegisteredType"); + } + + BasicRegisteredType type = (BasicRegisteredType) spec(item.getSymbolicName(), item.getVersion(), impl, item.getCatalogItemJavaType()); - type.bundles = item.getLibraries()==null ? ImmutableList.<OsgiBundleWithUrl>of() : ImmutableList.<OsgiBundleWithUrl>copyOf(item.getLibraries()); + type.displayName = item.getDisplayName(); + type.description = item.getDescription(); + type.iconUrl = item.getIconUrl(); ++ + type.disabled = item.isDisabled(); + type.deprecated = item.isDeprecated(); ++ if (item.getLibraries()!=null) type.bundles.addAll(item.getLibraries()); ++ // aliases aren't on item ++ if (item.tags()!=null) type.tags.addAll(item.tags().getTags()); + - // TODO - // probably not: javaType, specType, registeredTypeName ... - // maybe: tags ? ++ // these things from item we ignore: javaType, specType, registeredTypeName ... + return type; + } + + /** Preferred mechanism for defining a bean {@link RegisteredType}. */ + public static RegisteredType bean(String symbolicName, String version, TypeImplementationPlan plan, @Nullable Class<?> superType) { + return addSuperType(new BasicRegisteredType(RegisteredTypeKind.BEAN, symbolicName, version, plan), superType); + } + + /** Preferred mechanism for defining a spec {@link RegisteredType}. */ + // TODO we currently allow symbolicName and version to be null for the purposes of creation, internal only in BasicBrooklynTypeRegistry.createSpec + // (ideally the API in TypePlanTransformer can be changed so even that is not needed) + public static RegisteredType spec(String symbolicName, String version, TypeImplementationPlan plan, @Nullable Class<?> superType) { + return addSuperType(new BasicRegisteredType(RegisteredTypeKind.SPEC, symbolicName, version, plan), superType); + } + + /** returns the {@link Class} object corresponding to the given java type name, + * using the cache on the type and the loader defined on the type + * @param mgmt */ + @Beta + // TODO should this be on the AbstractTypePlanTransformer ? + public static Class<?> loadActualJavaType(String javaTypeName, ManagementContext mgmt, RegisteredType type, RegisteredTypeLoadingContext context) { + Class<?> result = ((BasicRegisteredType)type).getCache().get(ACTUAL_JAVA_TYPE); + if (result!=null) return result; + + result = CatalogUtils.newClassLoadingContext(mgmt, type, context==null ? null : context.getLoader()).loadClass( javaTypeName ); + + ((BasicRegisteredType)type).getCache().put(ACTUAL_JAVA_TYPE, result); + return result; + } + + @Beta + public static RegisteredType addSuperType(RegisteredType type, @Nullable Class<?> superType) { + if (superType!=null) { + ((BasicRegisteredType)type).superTypes.add(superType); + } + return type; + } - + @Beta + public static RegisteredType addSuperType(RegisteredType type, @Nullable RegisteredType superType) { + if (superType!=null) { + if (isSubtypeOf(superType, type)) { + throw new IllegalStateException(superType+" declares "+type+" as a supertype; cannot set "+superType+" as a supertype of "+type); + } + ((BasicRegisteredType)type).superTypes.add(superType); + } + return type; + } ++ @Beta ++ public static RegisteredType addSuperTypes(RegisteredType type, Iterable<Object> superTypesAsClassOrRegisteredType) { ++ if (superTypesAsClassOrRegisteredType!=null) { ++ for (Object superType: superTypesAsClassOrRegisteredType) { ++ if (superType==null) { ++ // nothing ++ } else if (superType instanceof Class) { ++ addSuperType(type, (Class<?>)superType); ++ } else if (superType instanceof RegisteredType) { ++ addSuperType(type, (RegisteredType)superType); ++ } else { ++ throw new IllegalStateException(superType+" supplied as a supertype of "+type+" but it is not a supported supertype"); ++ } ++ } ++ } ++ return type; ++ } ++ ++ @Beta ++ public static RegisteredType addAlias(RegisteredType type, String alias) { ++ if (alias!=null) { ++ ((BasicRegisteredType)type).aliases.add( alias ); ++ } ++ return type; ++ } ++ @Beta ++ public static RegisteredType addAliases(RegisteredType type, Iterable<String> aliases) { ++ if (aliases!=null) { ++ for (String alias: aliases) addAlias(type, alias); ++ } ++ return type; ++ } ++ ++ @Beta ++ public static RegisteredType addTag(RegisteredType type, Object tag) { ++ if (tag!=null) { ++ ((BasicRegisteredType)type).tags.add( tag ); ++ } ++ return type; ++ } ++ @Beta ++ public static RegisteredType addTags(RegisteredType type, Iterable<?> tags) { ++ if (tags!=null) { ++ for (Object tag: tags) addTag(type, tag); ++ } ++ return type; ++ } + + /** returns the implementation data for a spec if it is a string (e.g. plan yaml or java class name); else throws */ + @Beta + public static String getImplementationDataStringForSpec(RegisteredType item) { + if (item==null || item.getPlan()==null) return null; + Object data = item.getPlan().getPlanData(); + if (!(data instanceof String)) throw new IllegalStateException("Expected plan data for "+item+" to be a string"); + return (String)data; + } + + /** returns an implementation of the spec class corresponding to the given target type; + * for use in {@link BrooklynTypePlanTransformer#create(RegisteredType, RegisteredTypeLoadingContext)} + * implementations when dealing with a spec; returns null if none found + * @param mgmt */ + @Beta + public static AbstractBrooklynObjectSpec<?,?> newSpecInstance(ManagementContext mgmt, Class<? extends BrooklynObject> targetType) throws Exception { + Class<? extends AbstractBrooklynObjectSpec<?, ?>> specType = RegisteredTypeLoadingContexts.lookupSpecTypeForTarget(targetType); + if (specType==null) return null; + Method createMethod = specType.getMethod("create", Class.class); + return (AbstractBrooklynObjectSpec<?, ?>) createMethod.invoke(null, targetType); + } + + /** Returns a wrapped map, if the object is YAML which parses as a map; + * otherwise returns absent capable of throwing an error with more details */ + @SuppressWarnings("unchecked") + public static Maybe<Map<?,?>> getAsYamlMap(Object planData) { + if (!(planData instanceof String)) return Maybe.absent("not a string"); + Iterable<Object> result; + try { + result = Yamls.parseAll((String)planData); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + return Maybe.absent(e); + } + Iterator<Object> ri = result.iterator(); + if (!ri.hasNext()) return Maybe.absent("YAML has no elements in it"); + Object r1 = ri.next(); + if (ri.hasNext()) return Maybe.absent("YAML has multiple elements in it"); + if (r1 instanceof Map) return (Maybe<Map<?,?>>)(Maybe<?>) Maybe.of(r1); + return Maybe.absent("YAML does not contain a map"); + } + + /** + * Queries recursively the supertypes of {@link RegisteredType} to see whether it + * inherits from the given {@link RegisteredType} */ + public static boolean isSubtypeOf(RegisteredType type, RegisteredType superType) { + if (type.equals(superType)) return true; + for (Object st: type.getSuperTypes()) { + if (st instanceof RegisteredType) { + if (isSubtypeOf((RegisteredType)st, superType)) return true; + } + } + return false; + } + + /** + * Queries recursively the supertypes of {@link RegisteredType} to see whether it + * inherits from the given {@link Class} */ + public static boolean isSubtypeOf(RegisteredType type, Class<?> superType) { + return isAnyTypeSubtypeOf(type.getSuperTypes(), superType); + } + + /** + * Queries recursively the given types (either {@link Class} or {@link RegisteredType}) + * to see whether any inherit from the given {@link Class} */ + public static boolean isAnyTypeSubtypeOf(Set<Object> candidateTypes, Class<?> superType) { + return isAnyTypeOrSuperSatisfying(candidateTypes, Predicates.assignableFrom(superType)); + } + + /** + * Queries recursively the given types (either {@link Class} or {@link RegisteredType}) + * to see whether any java superclasses satisfy the given {@link Predicate} */ + public static boolean isAnyTypeOrSuperSatisfying(Set<Object> candidateTypes, Predicate<Class<?>> filter) { + for (Object st: candidateTypes) { + if (st instanceof Class) { + if (filter.apply((Class<?>)st)) return true; + } + } + for (Object st: candidateTypes) { + if (st instanceof RegisteredType) { + if (isAnyTypeOrSuperSatisfying(((RegisteredType)st).getSuperTypes(), filter)) return true; + } + } + return false; + } + - public static RegisteredType validate(RegisteredType item, final RegisteredTypeLoadingContext constraint) { - if (item==null || constraint==null) return item; ++ /** Validates that the given type matches the context (if supplied); ++ * if not satisfied. returns an {@link Absent} if failed with details of the error, ++ * with {@link Absent#isNull()} true if the object is null. */ ++ public static Maybe<RegisteredType> tryValidate(RegisteredType item, final RegisteredTypeLoadingContext constraint) { ++ // kept as a Maybe in case someone wants a wrapper around item validity; ++ // unclear what the contract should be, as this can return Maybe.Present(null) ++ // which is suprising, but it is more natural to callers otherwise they'll likely do a separate null check on the item ++ // (often handling null different to errors) so the Maybe.get() is redundant as they have an object for the input anyway. ++ ++ if (item==null || constraint==null) return Maybe.ofDisallowingNull(item); + if (constraint.getExpectedKind()!=null && !constraint.getExpectedKind().equals(item.getKind())) - throw new IllegalStateException(item+" is not the expected kind "+constraint.getExpectedKind()); ++ return Maybe.absent(item+" is not the expected kind "+constraint.getExpectedKind()); + if (constraint.getExpectedJavaSuperType()!=null) { + if (!isSubtypeOf(item, constraint.getExpectedJavaSuperType())) { - throw new IllegalStateException(item+" is not for the expected type "+constraint.getExpectedJavaSuperType()); ++ return Maybe.absent(item+" is not for the expected type "+constraint.getExpectedJavaSuperType()); + } + } - return item; ++ return Maybe.of(item); + } + + /** + * Checks whether the given object appears to be an instance of the given registered type */ + private static boolean isSubtypeOf(Class<?> candidate, RegisteredType type) { + for (Object st: type.getSuperTypes()) { + if (st instanceof RegisteredType) { + if (!isSubtypeOf(candidate, (RegisteredType)st)) return false; + } + if (st instanceof Class) { + if (!((Class<?>)st).isAssignableFrom(candidate)) return false; + } + } + return true; + } + - public static <T> T validate(final T object, final RegisteredType type, final RegisteredTypeLoadingContext constraint) { - RegisteredTypeKind kind = type!=null ? type.getKind() : constraint!=null ? constraint.getExpectedKind() : null; ++ public static RegisteredType getBestVersion(Iterable<RegisteredType> types) { ++ if (types==null || !types.iterator().hasNext()) return null; ++ return Ordering.from(RegisteredTypeComparator.INSTANCE).max(types); ++ } ++ ++ public static class RegisteredTypeComparator implements Comparator<RegisteredType> { ++ public static Comparator<RegisteredType> INSTANCE = new RegisteredTypeComparator(); ++ private RegisteredTypeComparator() {} ++ @Override ++ public int compare(RegisteredType o1, RegisteredType o2) { ++ return ComparisonChain.start() ++ .compareTrueFirst(o1.isDisabled(), o2.isDisabled()) ++ .compareTrueFirst(o1.isDeprecated(), o2.isDeprecated()) ++ .compare(o1.getSymbolicName(), o2.getSymbolicName(), NaturalOrderComparator.INSTANCE) ++ .compare(o1.getVersion(), o2.getVersion(), VersionComparator.INSTANCE) ++ .result(); ++ } ++ } ++ ++ /** validates that the given object (required) satisfies the constraints implied by the given ++ * type and context object, using {@link Maybe} as the result set absent containing the error(s) ++ * if not satisfied. returns an {@link Absent} if failed with details of the error, ++ * with {@link Absent#isNull()} true if the object is null. */ ++ public static <T> Maybe<T> tryValidate(final T object, @Nullable final RegisteredType type, @Nullable final RegisteredTypeLoadingContext context) { ++ if (object==null) return Maybe.absentNull("object is null"); ++ ++ RegisteredTypeKind kind = type!=null ? type.getKind() : context!=null ? context.getExpectedKind() : null; + if (kind==null) { + if (object instanceof AbstractBrooklynObjectSpec) kind=RegisteredTypeKind.SPEC; + else kind=RegisteredTypeKind.BEAN; + } - return new RegisteredTypeKindVisitor<T>() { ++ return new RegisteredTypeKindVisitor<Maybe<T>>() { + @Override - protected T visitSpec() { - return validateSpec(object, type, constraint); ++ protected Maybe<T> visitSpec() { ++ return tryValidateSpec(object, type, context); + } + + @Override - protected T visitBean() { - return validateBean(object, type, constraint); ++ protected Maybe<T> visitBean() { ++ return tryValidateBean(object, type, context); + } + }.visit(kind); + } + - private static <T> T validateBean(T object, RegisteredType type, final RegisteredTypeLoadingContext constraint) { - if (object==null) return null; ++ private static <T> Maybe<T> tryValidateBean(T object, RegisteredType type, final RegisteredTypeLoadingContext context) { ++ if (object==null) return Maybe.absentNull("object is null"); + + if (type!=null) { + if (type.getKind()!=RegisteredTypeKind.BEAN) - throw new IllegalStateException("Validating a bean when type is "+type.getKind()+" "+type); ++ return Maybe.absent("Validating a bean when type is "+type.getKind()+" "+type); + if (!isSubtypeOf(object.getClass(), type)) - throw new IllegalStateException(object+" does not have all the java supertypes of "+type); ++ return Maybe.absent(object+" does not have all the java supertypes of "+type); + } + - if (constraint!=null) { - if (constraint.getExpectedKind()!=RegisteredTypeKind.BEAN) - throw new IllegalStateException("Validating a bean when constraint expected "+constraint.getExpectedKind()); - if (constraint.getExpectedJavaSuperType()!=null && !constraint.getExpectedJavaSuperType().isInstance(object)) - throw new IllegalStateException(object+" is not of the expected java supertype "+constraint.getExpectedJavaSuperType()); ++ if (context!=null) { ++ if (context.getExpectedKind()!=RegisteredTypeKind.BEAN) ++ return Maybe.absent("Validating a bean when constraint expected "+context.getExpectedKind()); ++ if (context.getExpectedJavaSuperType()!=null && !context.getExpectedJavaSuperType().isInstance(object)) ++ return Maybe.absent(object+" is not of the expected java supertype "+context.getExpectedJavaSuperType()); + } + - return object; ++ return Maybe.of(object); + } + - private static <T> T validateSpec(T object, RegisteredType rType, final RegisteredTypeLoadingContext constraint) { - if (object==null) return null; ++ private static <T> Maybe<T> tryValidateSpec(T object, RegisteredType rType, final RegisteredTypeLoadingContext constraint) { ++ if (object==null) return Maybe.absentNull("object is null"); + + if (!(object instanceof AbstractBrooklynObjectSpec)) { - throw new IllegalStateException("Found "+object+" when expecting a spec"); ++ Maybe.absent("Found "+object+" when expecting a spec"); + } + Class<?> targetType = ((AbstractBrooklynObjectSpec<?,?>)object).getType(); + + if (targetType==null) { - throw new IllegalStateException("Spec "+object+" does not have a target type"); ++ Maybe.absent("Spec "+object+" does not have a target type"); + } + + if (rType!=null) { + if (rType.getKind()!=RegisteredTypeKind.SPEC) - throw new IllegalStateException("Validating a spec when type is "+rType.getKind()+" "+rType); ++ Maybe.absent("Validating a spec when type is "+rType.getKind()+" "+rType); + if (!isSubtypeOf(targetType, rType)) - throw new IllegalStateException(object+" does not have all the java supertypes of "+rType); ++ Maybe.absent(object+" does not have all the java supertypes of "+rType); + } + + if (constraint!=null) { + if (constraint.getExpectedJavaSuperType()!=null) { + if (!constraint.getExpectedJavaSuperType().isAssignableFrom(targetType)) { - throw new IllegalStateException(object+" does not target the expected java supertype "+constraint.getExpectedJavaSuperType()); ++ Maybe.absent(object+" does not target the expected java supertype "+constraint.getExpectedJavaSuperType()); + } + if (constraint.getExpectedJavaSuperType().isAssignableFrom(BrooklynObjectInternal.class)) { + // don't check spec type; any spec is acceptable + } else { + @SuppressWarnings("unchecked") + Class<? extends AbstractBrooklynObjectSpec<?, ?>> specType = RegisteredTypeLoadingContexts.lookupSpecTypeForTarget( (Class<? extends BrooklynObject>) constraint.getExpectedJavaSuperType()); + if (specType==null) { + // means a problem in our classification of spec types! - throw new IllegalStateException(object+" is returned as spec for unexpected java supertype "+constraint.getExpectedJavaSuperType()); ++ Maybe.absent(object+" is returned as spec for unexpected java supertype "+constraint.getExpectedJavaSuperType()); + } + if (!specType.isAssignableFrom(object.getClass())) { - throw new IllegalStateException(object+" is not a spec of the expected java supertype "+constraint.getExpectedJavaSuperType()); ++ Maybe.absent(object+" is not a spec of the expected java supertype "+constraint.getExpectedJavaSuperType()); + } + } + } + } - return object; ++ return Maybe.of(object); + } + + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java ---------------------------------------------------------------------- diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java index 0000000,db3a72d..558d50a mode 000000,100644..100644 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java +++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java @@@ -1,0 -1,138 +1,138 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.brooklyn.enricher.stock.reducer; + + import java.util.List; + import java.util.Map; + import java.util.Objects; + + import org.apache.brooklyn.api.entity.Entity; + import org.apache.brooklyn.api.entity.EntityLocal; + import org.apache.brooklyn.api.sensor.AttributeSensor; + import org.apache.brooklyn.api.sensor.Sensor; + import org.apache.brooklyn.api.sensor.SensorEvent; + import org.apache.brooklyn.api.sensor.SensorEventListener; + import org.apache.brooklyn.config.ConfigKey; + import org.apache.brooklyn.core.config.ConfigKeys; + import org.apache.brooklyn.core.enricher.AbstractEnricher; + import org.apache.brooklyn.util.core.flags.SetFromFlag; + import org.apache.brooklyn.util.core.sensor.SensorPredicates; + import org.apache.brooklyn.util.core.task.Tasks; + import org.apache.brooklyn.util.core.task.ValueResolver; + import org.apache.brooklyn.util.text.StringFunctions; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + -import com.google.api.client.util.Lists; + import com.google.common.base.Function; + import com.google.common.base.Optional; + import com.google.common.base.Preconditions; + import com.google.common.collect.ImmutableList; + import com.google.common.collect.Iterables; ++import com.google.common.collect.Lists; + import com.google.common.reflect.TypeToken; + + @SuppressWarnings("serial") + public class Reducer extends AbstractEnricher implements SensorEventListener<Object> { + + private static final Logger LOG = LoggerFactory.getLogger(Reducer.class); + + @SetFromFlag("producer") + public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer"); + public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); + public static ConfigKey<List<? extends AttributeSensor<?>>> SOURCE_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<? extends AttributeSensor<?>>>() {}, "enricher.sourceSensors"); + public static ConfigKey<Function<List<?>,?>> REDUCER_FUNCTION = ConfigKeys.newConfigKey(new TypeToken<Function<List<?>, ?>>() {}, "enricher.reducerFunction"); + @SetFromFlag("transformation") + public static final ConfigKey<String> REDUCER_FUNCTION_TRANSFORMATION = ConfigKeys.newStringConfigKey("enricher.reducerFunction.transformation", + "A string matching a pre-defined named reducer function, such as joiner"); + public static final ConfigKey<Map<String, Object>> PARAMETERS = ConfigKeys.newConfigKey(new TypeToken<Map<String, Object>>() {}, "enricher.reducerFunction.parameters", + "A map of parameters to pass into the reducer function"); + + protected Entity producer; + protected List<AttributeSensor<?>> subscribedSensors; + protected Sensor<?> targetSensor; + protected Function<Iterable<?>, ?> reducerFunction; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + Preconditions.checkNotNull(getConfig(SOURCE_SENSORS), "source sensors"); + + this.producer = getConfig(PRODUCER) == null ? entity : getConfig(PRODUCER); + List<AttributeSensor<?>> sensorListTemp = Lists.newArrayList(); + + for (Object sensorO : getConfig(SOURCE_SENSORS)) { + AttributeSensor<?> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get(); + Optional<? extends Sensor<?>> foundSensor = Iterables.tryFind(sensorListTemp, + SensorPredicates.nameEqualTo(sensor.getName())); + + if(!foundSensor.isPresent()) { + sensorListTemp.add(sensor); + } + } + + String reducerName = config().get(REDUCER_FUNCTION_TRANSFORMATION); + Function<Iterable<?>, ?> reducerFunction = (Function) config().get(REDUCER_FUNCTION); + if(reducerFunction == null){ + Map<String, ?> parameters = config().get(PARAMETERS); + reducerFunction = createReducerFunction(reducerName, parameters); + } + + this.reducerFunction = reducerFunction; + Preconditions.checkState(sensorListTemp.size() > 0, "Nothing to reduce"); + + for (Sensor<?> sensor : sensorListTemp) { + subscribe(producer, sensor, this); + } + + subscribedSensors = ImmutableList.copyOf(sensorListTemp); + } + + // Default implementation, subclasses should override + protected Function<Iterable<?>, ?> createReducerFunction(String reducerName, Map<String, ?> parameters){ + if(Objects.equals(reducerName, "joiner")){ + String separator = (String) parameters.get("separator"); + return StringFunctions.joiner(separator == null ? ", " : separator); + } + + if (Objects.equals(reducerName, "formatString")){ + String format = Preconditions.checkNotNull((String)parameters.get("format"), "format"); + return StringFunctions.formatterForIterable(format); + } + throw new IllegalStateException("unknown function: " + reducerName); + } + + @Override + public void onEvent(SensorEvent<Object> event) { + Sensor<?> destinationSensor = getConfig(TARGET_SENSOR); + + List<Object> values = Lists.newArrayList(); + + for (AttributeSensor<?> sourceSensor : subscribedSensors) { + Object resolvedSensorValue = entity.sensors().get(sourceSensor); + values.add(resolvedSensorValue); + } + Object result = reducerFunction.apply(values); + + if (LOG.isTraceEnabled()) LOG.trace("enricher {} got {}, propagating via {} as {}", + new Object[] {this, event, entity, reducerFunction, destinationSensor}); + + emit(destinationSensor, result); + } + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java ---------------------------------------------------------------------- diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java index 0000000,756f665..b8e5c63 mode 000000,100644..100644 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java +++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java @@@ -1,0 -1,971 +1,972 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.brooklyn.entity.group; + + import static com.google.common.base.Preconditions.checkArgument; + import static com.google.common.base.Preconditions.checkNotNull; + + import java.util.Collection; + import java.util.Collections; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.NoSuchElementException; + import java.util.Set; + import java.util.concurrent.Callable; + import java.util.concurrent.atomic.AtomicInteger; + + import javax.annotation.Nullable; + + import org.apache.brooklyn.api.entity.Entity; + import org.apache.brooklyn.api.entity.EntitySpec; + import org.apache.brooklyn.api.entity.Group; + import org.apache.brooklyn.api.location.Location; + import org.apache.brooklyn.api.location.MachineProvisioningLocation; + import org.apache.brooklyn.api.mgmt.Task; + import org.apache.brooklyn.api.policy.Policy; + import org.apache.brooklyn.api.sensor.AttributeSensor; ++import org.apache.brooklyn.core.config.Sanitizer; + import org.apache.brooklyn.core.config.render.RendererHints; + import org.apache.brooklyn.core.effector.Effectors; + import org.apache.brooklyn.core.entity.Entities; + import org.apache.brooklyn.core.entity.factory.EntityFactory; + import org.apache.brooklyn.core.entity.factory.EntityFactoryForLocation; + import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; + import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; + import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic; + import org.apache.brooklyn.core.entity.trait.Startable; + import org.apache.brooklyn.core.entity.trait.StartableMethods; + import org.apache.brooklyn.core.location.Locations; + import org.apache.brooklyn.core.location.cloud.AvailabilityZoneExtension; + import org.apache.brooklyn.core.sensor.Sensors; + import org.apache.brooklyn.entity.stock.DelegateEntity; + import org.apache.brooklyn.feed.function.FunctionFeed; + import org.apache.brooklyn.feed.function.FunctionPollConfig; + import org.apache.brooklyn.util.collections.MutableList; + import org.apache.brooklyn.util.collections.MutableMap; + import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks; + import org.apache.brooklyn.util.core.flags.TypeCoercions; + import org.apache.brooklyn.util.core.task.DynamicTasks; + import org.apache.brooklyn.util.core.task.TaskTags; + import org.apache.brooklyn.util.core.task.Tasks; + import org.apache.brooklyn.util.exceptions.Exceptions; + import org.apache.brooklyn.util.exceptions.ReferenceWithError; + import org.apache.brooklyn.util.guava.Maybe; + import org.apache.brooklyn.util.javalang.JavaClassNames; + import org.apache.brooklyn.util.javalang.Reflections; + import org.apache.brooklyn.util.text.StringPredicates; + import org.apache.brooklyn.util.text.Strings; + import org.apache.brooklyn.util.time.Duration; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.google.common.base.Function; + import com.google.common.base.Functions; + import com.google.common.base.Optional; + import com.google.common.base.Preconditions; + import com.google.common.base.Predicates; + import com.google.common.base.Supplier; + import com.google.common.collect.ImmutableList; + import com.google.common.collect.ImmutableMap; + import com.google.common.collect.ImmutableSet; + import com.google.common.collect.Iterables; + import com.google.common.collect.LinkedHashMultimap; + import com.google.common.collect.Lists; + import com.google.common.collect.Maps; + import com.google.common.collect.Multimap; + import com.google.common.collect.Sets; + import com.google.common.reflect.TypeToken; + + /** + * A cluster of entities that can dynamically increase or decrease the number of entities. + */ + public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicCluster { + + @SuppressWarnings("serial") + private static final AttributeSensor<Supplier<Integer>> NEXT_CLUSTER_MEMBER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {}, + "next.cluster.member.id", "Returns the ID number of the next member to be added"); + + private volatile FunctionFeed clusterOneAndAllMembersUp; + + // TODO better mechanism for arbitrary class name to instance type coercion + static { + TypeCoercions.registerAdapter(String.class, NodePlacementStrategy.class, new Function<String, NodePlacementStrategy>() { + @Override + public NodePlacementStrategy apply(final String input) { + ClassLoader classLoader = NodePlacementStrategy.class.getClassLoader(); + Optional<NodePlacementStrategy> strategy = Reflections.<NodePlacementStrategy>invokeConstructorWithArgs(classLoader, input); + if (strategy.isPresent()) { + return strategy.get(); + } else { + throw new IllegalStateException("Failed to create NodePlacementStrategy "+input); + } + } + }); + TypeCoercions.registerAdapter(String.class, ZoneFailureDetector.class, new Function<String, ZoneFailureDetector>() { + @Override + public ZoneFailureDetector apply(final String input) { + ClassLoader classLoader = ZoneFailureDetector.class.getClassLoader(); + Optional<ZoneFailureDetector> detector = Reflections.<ZoneFailureDetector>invokeConstructorWithArgs(classLoader, input); + if (detector.isPresent()) { + return detector.get(); + } else { + throw new IllegalStateException("Failed to create ZoneFailureDetector "+input); + } + } + }); + } + + static { + RendererHints.register(FIRST, RendererHints.namedActionWithUrl("Open", DelegateEntity.EntityUrl.entityUrl())); + RendererHints.register(CLUSTER, RendererHints.namedActionWithUrl("Open", DelegateEntity.EntityUrl.entityUrl())); + } + + + private static final Logger LOG = LoggerFactory.getLogger(DynamicClusterImpl.class); + + /** + * Mutex for synchronizing during re-size operations. + * Sub-classes should use with great caution, to not introduce deadlocks! + */ + protected final Object mutex = new Object[0]; + + private static final Function<Collection<Entity>, Entity> defaultRemovalStrategy = new Function<Collection<Entity>, Entity>() { + @Override public Entity apply(Collection<Entity> contenders) { + /* + * Choose the newest entity (largest cluster member ID or latest timestamp) that is stoppable. + * If none are stoppable, take the newest non-stoppable. + * + * Both cluster member ID and timestamp must be taken into consideration to account for legacy + * clusters that were created before the addition of the cluster member ID config value. + */ + int largestClusterMemberId = -1; + long newestTime = 0L; + Entity newest = null; + + for (Entity contender : contenders) { + Integer contenderClusterMemberId = contender.config().get(CLUSTER_MEMBER_ID); + long contenderCreationTime = contender.getCreationTime(); + + boolean newer = (contenderClusterMemberId != null && contenderClusterMemberId > largestClusterMemberId) || + contenderCreationTime > newestTime; + + if ((contender instanceof Startable && newer) || + (!(newest instanceof Startable) && ((contender instanceof Startable) || newer))) { + newest = contender; + + if (contenderClusterMemberId != null) largestClusterMemberId = contenderClusterMemberId; + newestTime = contenderCreationTime; + } + } + + return newest; + } + }; + + private static class NextClusterMemberIdSupplier implements Supplier<Integer> { + private AtomicInteger nextId = new AtomicInteger(0); + + @Override + public Integer get() { + return nextId.getAndIncrement(); + } + } + + public DynamicClusterImpl() { + } + + @Override + public void init() { + super.init(); + initialiseMemberId(); + connectAllMembersUp(); + } + + private void initialiseMemberId() { + synchronized (mutex) { + if (sensors().get(NEXT_CLUSTER_MEMBER_ID) == null) { + sensors().set(NEXT_CLUSTER_MEMBER_ID, new NextClusterMemberIdSupplier()); + } + } + } + + private void connectAllMembersUp() { + clusterOneAndAllMembersUp = FunctionFeed.builder() + .entity(this) + .period(Duration.FIVE_SECONDS) + .poll(new FunctionPollConfig<Boolean, Boolean>(CLUSTER_ONE_AND_ALL_MEMBERS_UP) + .onException(Functions.constant(Boolean.FALSE)) + .callable(new ClusterOneAndAllMembersUpCallable(this))) + .build(); + } + + private static class ClusterOneAndAllMembersUpCallable implements Callable<Boolean> { + + private final Group cluster; + + public ClusterOneAndAllMembersUpCallable(Group cluster) { + this.cluster = cluster; + } + + @Override + public Boolean call() throws Exception { + if (cluster.getMembers().isEmpty()) + return false; + + if (Lifecycle.RUNNING != cluster.sensors().get(SERVICE_STATE_ACTUAL)) + return false; + + for (Entity member : cluster.getMembers()) + if (!Boolean.TRUE.equals(member.sensors().get(SERVICE_UP))) + return false; + + return true; + } + } + + @Override + protected void initEnrichers() { + if (config().getRaw(UP_QUORUM_CHECK).isAbsent() && getConfig(INITIAL_SIZE)==0) { + // if initial size is 0 then override up check to allow zero if empty + config().set(UP_QUORUM_CHECK, QuorumChecks.atLeastOneUnlessEmpty()); + sensors().set(SERVICE_UP, true); + } else { + sensors().set(SERVICE_UP, false); + } + super.initEnrichers(); + // override previous enricher so that only members are checked + ServiceStateLogic.newEnricherFromChildrenUp().checkMembersOnly().requireUpChildren(getConfig(UP_QUORUM_CHECK)).addTo(this); + } + + @Override + public void setRemovalStrategy(Function<Collection<Entity>, Entity> val) { + config().set(REMOVAL_STRATEGY, checkNotNull(val, "removalStrategy")); + } + + protected Function<Collection<Entity>, Entity> getRemovalStrategy() { + Function<Collection<Entity>, Entity> result = getConfig(REMOVAL_STRATEGY); + return (result != null) ? result : defaultRemovalStrategy; + } + + @Override + public void setZonePlacementStrategy(NodePlacementStrategy val) { + config().set(ZONE_PLACEMENT_STRATEGY, checkNotNull(val, "zonePlacementStrategy")); + } + + protected NodePlacementStrategy getZonePlacementStrategy() { + return checkNotNull(getConfig(ZONE_PLACEMENT_STRATEGY), "zonePlacementStrategy config"); + } + + @Override + public void setZoneFailureDetector(ZoneFailureDetector val) { + config().set(ZONE_FAILURE_DETECTOR, checkNotNull(val, "zoneFailureDetector")); + } + + protected ZoneFailureDetector getZoneFailureDetector() { + return checkNotNull(getConfig(ZONE_FAILURE_DETECTOR), "zoneFailureDetector config"); + } + + protected EntitySpec<?> getFirstMemberSpec() { + return getConfig(FIRST_MEMBER_SPEC); + } + + protected EntitySpec<?> getMemberSpec() { + return getConfig(MEMBER_SPEC); + } + + /** @deprecated since 0.7.0; use {@link #getMemberSpec()} */ + @Deprecated + protected EntityFactory<?> getFactory() { + return getConfig(FACTORY); + } + + @Override + public void setMemberSpec(EntitySpec<?> memberSpec) { + setConfigEvenIfOwned(MEMBER_SPEC, memberSpec); + } + + /** @deprecated since 0.7.0; use {@link #setMemberSpec(EntitySpec)} */ + @Deprecated + @Override + public void setFactory(EntityFactory<?> factory) { + setConfigEvenIfOwned(FACTORY, factory); + } + + private Location getLocation() { + Collection<? extends Location> ll = Locations.getLocationsCheckingAncestors(getLocations(), this); + try { + return Iterables.getOnlyElement(ll); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + if (ll.isEmpty()) throw new IllegalStateException("No location available for "+this); + else throw new IllegalStateException("Ambiguous location for "+this+"; expected one but had "+ll); + } + } + + protected boolean isAvailabilityZoneEnabled() { + return getConfig(ENABLE_AVAILABILITY_ZONES); + } + + protected boolean isQuarantineEnabled() { + return getConfig(QUARANTINE_FAILED_ENTITIES); + } + + protected QuarantineGroup getQuarantineGroup() { + return getAttribute(QUARANTINE_GROUP); + } + + protected int getInitialQuorumSize() { + int initialSize = getConfig(INITIAL_SIZE).intValue(); + int initialQuorumSize = getConfig(INITIAL_QUORUM_SIZE).intValue(); + if (initialQuorumSize < 0) initialQuorumSize = initialSize; + if (initialQuorumSize > initialSize) { + LOG.warn("On start of cluster {}, misconfigured initial quorum size {} greater than initial size{}; using {}", new Object[] {initialQuorumSize, initialSize, initialSize}); + initialQuorumSize = initialSize; + } + return initialQuorumSize; + } + + @Override + public void start(Collection<? extends Location> locsO) { + if (locsO!=null) { + checkArgument(locsO.size() <= 1, "Wrong number of locations supplied to start %s: %s", this, locsO); + addLocations(locsO); + } + Location loc = getLocation(); + + EntitySpec<?> spec = getConfig(MEMBER_SPEC); + if (spec!=null) { + setDefaultDisplayName("Cluster of "+JavaClassNames.simpleClassName(spec.getType()) +" ("+loc+")"); + } + + if (isAvailabilityZoneEnabled()) { + sensors().set(SUB_LOCATIONS, findSubLocations(loc)); + } + + ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING); + ServiceProblemsLogic.clearProblemsIndicator(this, START); + try { + doStart(); + DynamicTasks.waitForLast(); + + } catch (Exception e) { + ServiceProblemsLogic.updateProblemsIndicator(this, START, "start failed with error: "+e); + throw Exceptions.propagate(e); + } finally { + ServiceStateLogic.setExpectedState(this, Lifecycle.RUNNING); + } + } + + protected void doStart() { + if (isQuarantineEnabled()) { + QuarantineGroup quarantineGroup = getAttribute(QUARANTINE_GROUP); + if (quarantineGroup==null || !Entities.isManaged(quarantineGroup)) { + quarantineGroup = addChild(EntitySpec.create(QuarantineGroup.class).displayName("quarantine")); + sensors().set(QUARANTINE_GROUP, quarantineGroup); + } + } + + int initialSize = getConfig(INITIAL_SIZE).intValue(); + int initialQuorumSize = getInitialQuorumSize(); + Exception internalError = null; + + try { + resize(initialSize); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + // Apart from logging, ignore problems here; we extract them below. + // But if it was this thread that threw the exception (rather than a sub-task), then need + // to record that failure here. + LOG.debug("Error resizing "+this+" to size "+initialSize+" (collecting and handling): "+e, e); + internalError = e; + } + + Iterable<Task<?>> failed = Tasks.failed(Tasks.children(Tasks.current())); + boolean noFailed = Iterables.isEmpty(failed); + boolean severalFailed = Iterables.size(failed) > 1; + + int currentSize = getCurrentSize().intValue(); + if (currentSize < initialQuorumSize) { + String message; + if (currentSize == 0 && !noFailed) { + if (severalFailed) + message = "All nodes in cluster "+this+" failed"; + else + message = "Node in cluster "+this+" failed"; + } else { + message = "On start of cluster " + this + ", failed to get to initial size of " + initialSize + + "; size is " + getCurrentSize() + + (initialQuorumSize != initialSize ? " (initial quorum size is " + initialQuorumSize + ")" : ""); + } + Throwable firstError = Tasks.getError(Maybe.next(failed.iterator()).orNull()); + if (firstError==null && internalError!=null) { + // only use the internal error if there were no nested task failures + // (otherwise the internal error should be a wrapper around the nested failures) + firstError = internalError; + } + if (firstError!=null) { + if (severalFailed) { + message += "; first failure is: "+Exceptions.collapseText(firstError); + } else { + message += ": "+Exceptions.collapseText(firstError); + } + } + throw new IllegalStateException(message, firstError); + + } else if (currentSize < initialSize) { + LOG.warn( + "On start of cluster {}, size {} reached initial minimum quorum size of {} but did not reach desired size {}; continuing", + new Object[] { this, currentSize, initialQuorumSize, initialSize }); + } + + for (Policy it : policies()) { + it.resume(); + } + } + + protected List<Location> findSubLocations(Location loc) { + if (!loc.hasExtension(AvailabilityZoneExtension.class)) { + throw new IllegalStateException("Availability zone extension not supported for location " + loc); + } + + AvailabilityZoneExtension zoneExtension = loc.getExtension(AvailabilityZoneExtension.class); + + Collection<String> zoneNames = getConfig(AVAILABILITY_ZONE_NAMES); + Integer numZones = getConfig(NUM_AVAILABILITY_ZONES); + + List<Location> subLocations; + if (zoneNames == null || zoneNames.isEmpty()) { + if (numZones != null) { + subLocations = zoneExtension.getSubLocations(numZones); + + checkArgument(numZones > 0, "numZones must be greater than zero: %s", numZones); + if (numZones > subLocations.size()) { + throw new IllegalStateException("Number of required zones (" + numZones + ") not satisfied in " + loc + + "; only " + subLocations.size() + " available: " + subLocations); + } + } else { + subLocations = zoneExtension.getAllSubLocations(); + } + } else { + // TODO check that these are valid region / availabilityZones? + subLocations = zoneExtension.getSubLocationsByName(StringPredicates.equalToAny(zoneNames), zoneNames.size()); + + if (zoneNames.size() > subLocations.size()) { + throw new IllegalStateException("Number of required zones (" + zoneNames.size() + " - " + zoneNames + + ") not satisfied in " + loc + "; only " + subLocations.size() + " available: " + subLocations); + } + } + + LOG.info("Returning {} sub-locations: {}", subLocations.size(), Iterables.toString(subLocations)); + return subLocations; + } + + @Override + public void stop() { + ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPING); + try { + for (Policy it : policies()) { it.suspend(); } + + // run shrink without mutex to make things stop even if starting, + int size = getCurrentSize(); + if (size > 0) { shrink(-size); } + + // run resize with mutex to prevent others from starting things + resize(0); + + // also stop any remaining stoppable children -- eg those on fire + // (this ignores the quarantine node which is not stoppable) + StartableMethods.stop(this); + + ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPED); + } catch (Exception e) { + ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE); + throw Exceptions.propagate(e); + } finally { + if (clusterOneAndAllMembersUp != null) clusterOneAndAllMembersUp.stop(); + } + } + + @Override + public void restart() { + throw new UnsupportedOperationException(); + } + + @Override + public Integer resize(Integer desiredSize) { + synchronized (mutex) { + int originalSize = getCurrentSize(); + int delta = desiredSize - originalSize; + if (delta != 0) { + LOG.info("Resize {} from {} to {}", new Object[] {this, originalSize, desiredSize}); + } else { + if (LOG.isDebugEnabled()) LOG.debug("Resize no-op {} from {} to {}", new Object[] {this, originalSize, desiredSize}); + } + resizeByDelta(delta); + } + return getCurrentSize(); + } + + /** + * {@inheritDoc} + * + * <strong>Note</strong> for sub-classes; this method can be called while synchronized on {@link #mutex}. + */ + @Override + public String replaceMember(String memberId) { + Entity member = getEntityManager().getEntity(memberId); + LOG.info("In {}, replacing member {} ({})", new Object[] {this, memberId, member}); + + if (member == null) { + throw new NoSuchElementException("In "+this+", entity "+memberId+" cannot be resolved, so not replacing"); + } + + synchronized (mutex) { + if (!getMembers().contains(member)) { + throw new NoSuchElementException("In "+this+", entity "+member+" is not a member so not replacing"); + } + + Location memberLoc = null; + if (isAvailabilityZoneEnabled()) { + // this member's location could be a machine provisioned by a sub-location, or the actual sub-location + List<Location> subLocations = findSubLocations(getLocation()); + Collection<Location> actualMemberLocs = member.getLocations(); + boolean foundMatch = false; + for (Iterator<Location> iter = actualMemberLocs.iterator(); !foundMatch && iter.hasNext();) { + Location actualMemberLoc = iter.next(); + Location contenderMemberLoc = actualMemberLoc; + do { + if (subLocations.contains(contenderMemberLoc)) { + memberLoc = contenderMemberLoc; + foundMatch = true; + LOG.debug("In {} replacing member {} ({}), inferred its sub-location is {}", new Object[] {this, memberId, member, memberLoc}); + } + contenderMemberLoc = contenderMemberLoc.getParent(); + } while (!foundMatch && contenderMemberLoc != null); + } + if (!foundMatch) { + if (actualMemberLocs.isEmpty()) { + memberLoc = subLocations.get(0); + LOG.warn("In {} replacing member {} ({}), has no locations; falling back to first availability zone: {}", new Object[] {this, memberId, member, memberLoc}); + } else { + memberLoc = Iterables.tryFind(actualMemberLocs, Predicates.instanceOf(MachineProvisioningLocation.class)).or(Iterables.getFirst(actualMemberLocs, null)); + LOG.warn("In {} replacing member {} ({}), could not find matching sub-location; falling back to its actual location: {}", new Object[] {this, memberId, member, memberLoc}); + } + } else if (memberLoc == null) { + // impossible to get here, based on logic above! + throw new IllegalStateException("Unexpected condition! cluster="+this+"; member="+member+"; actualMemberLocs="+actualMemberLocs); + } + } else { + // Replacing member, so new member should be in the same location as that being replaced. + // Expect this to agree with `getMemberSpec().getLocations()` (if set). If not, then + // presumably there was a reason this specific member was started somewhere else! + memberLoc = getLocation(); + } + + Entity replacement = replaceMember(member, memberLoc, ImmutableMap.of()); + return replacement.getId(); + } + } + + /** + * @throws StopFailedRuntimeException If stop failed, after successfully starting replacement + */ + protected Entity replaceMember(Entity member, Location memberLoc, Map<?, ?> extraFlags) { + synchronized (mutex) { + ReferenceWithError<Optional<Entity>> added = addInSingleLocation(memberLoc, extraFlags); + + if (!added.getWithoutError().isPresent()) { + String msg = String.format("In %s, failed to grow, to replace %s; not removing", this, member); + if (added.hasError()) + throw new IllegalStateException(msg, added.getError()); + throw new IllegalStateException(msg); + } + + try { + stopAndRemoveNode(member); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + throw new StopFailedRuntimeException("replaceMember failed to stop and remove old member "+member.getId(), e); + } + + return added.getWithError().get(); + } + } + + protected Multimap<Location, Entity> getMembersByLocation() { + Multimap<Location, Entity> result = LinkedHashMultimap.create(); + for (Entity member : getMembers()) { + Collection<Location> memberLocs = member.getLocations(); + Location memberLoc = Iterables.getFirst(memberLocs, null); + if (memberLoc != null) { + result.put(memberLoc, member); + } + } + return result; + } + + protected List<Location> getNonFailedSubLocations() { + List<Location> result = Lists.newArrayList(); + Set<Location> failed = Sets.newLinkedHashSet(); + List<Location> subLocations = findSubLocations(getLocation()); + Set<Location> oldFailedSubLocations = getAttribute(FAILED_SUB_LOCATIONS); + if (oldFailedSubLocations == null) + oldFailedSubLocations = ImmutableSet.<Location> of(); + + for (Location subLocation : subLocations) { + if (getZoneFailureDetector().hasFailed(subLocation)) { + failed.add(subLocation); + } else { + result.add(subLocation); + } + } + + Set<Location> newlyFailed = Sets.difference(failed, oldFailedSubLocations); + Set<Location> newlyRecovered = Sets.difference(oldFailedSubLocations, failed); + sensors().set(FAILED_SUB_LOCATIONS, failed); + sensors().set(SUB_LOCATIONS, result); + if (newlyFailed.size() > 0) { + LOG.warn("Detected probably zone failures for {}: {}", this, newlyFailed); + } + if (newlyRecovered.size() > 0) { + LOG.warn("Detected probably zone recoveries for {}: {}", this, newlyRecovered); + } + + return result; + } + + /** + * {@inheritDoc} + * + * <strong>Note</strong> for sub-classes; this method can be called while synchronized on {@link #mutex}. + */ + @Override + public Collection<Entity> resizeByDelta(int delta) { + synchronized (mutex) { + if (delta > 0) { + return grow(delta); + } else if (delta < 0) { + return shrink(delta); + } else { + return ImmutableList.<Entity>of(); + } + } + } + + /** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */ + protected Collection<Entity> grow(int delta) { + Preconditions.checkArgument(delta > 0, "Must call grow with positive delta."); + + // choose locations to be deployed to + List<Location> chosenLocations; + List<Location> memberLocations = getMemberSpec() == null ? null : getMemberSpec().getLocations(); + if (memberLocations != null && memberLocations.size() > 0) { + // The memberSpec overrides the location passed to cluster.start(); use + // the location defined on the member. + if (isAvailabilityZoneEnabled()) { + LOG.warn("Cluster {} has availability-zone enabled, but memberSpec overrides location with {}; using " + + "memberSpec's location; availability-zone behaviour will not apply", this, memberLocations); + } + chosenLocations = Collections.nCopies(delta, memberLocations.get(0)); + } else if (isAvailabilityZoneEnabled()) { + List<Location> subLocations = getNonFailedSubLocations(); + Multimap<Location, Entity> membersByLocation = getMembersByLocation(); + chosenLocations = getZonePlacementStrategy().locationsForAdditions(membersByLocation, subLocations, delta); + if (chosenLocations.size() != delta) { + throw new IllegalStateException("Node placement strategy chose " + Iterables.size(chosenLocations) + + ", when expected delta " + delta + " in " + this); + } + } else { + chosenLocations = Collections.nCopies(delta, getLocation()); + } + + // create and start the entities + return addInEachLocation(chosenLocations, ImmutableMap.of()).getWithError(); + } + + /** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */ + @SuppressWarnings("unchecked") + protected Collection<Entity> shrink(int delta) { + Preconditions.checkArgument(delta < 0, "Must call shrink with negative delta."); + int size = getCurrentSize(); + if (-delta > size) { + // some subclasses (esp in tests) use custom sizes without the members set always being accurate, so put a limit on the size + LOG.warn("Call to shrink "+this+" by "+delta+" when size is "+size+"; amending"); + delta = -size; + } + if (delta==0) return ImmutableList.<Entity>of(); + + Collection<Entity> removedEntities = pickAndRemoveMembers(delta * -1); + + // FIXME symmetry in order of added as child, managed, started, and added to group + Task<?> invoke = Entities.invokeEffector(this, (Iterable<Entity>)(Iterable<?>)Iterables.filter(removedEntities, Startable.class), Startable.STOP, Collections.<String,Object>emptyMap()); + try { + invoke.get(); + return removedEntities; + } catch (Exception e) { + throw Exceptions.propagate(e); + } finally { + for (Entity removedEntity : removedEntities) { + discardNode(removedEntity); + } + } + } + + protected ReferenceWithError<Optional<Entity>> addInSingleLocation(Location location, Map<?,?> flags) { + ReferenceWithError<Collection<Entity>> added = addInEachLocation(ImmutableList.of(location), flags); + + Optional<Entity> result = Iterables.isEmpty(added.getWithoutError()) ? Optional.<Entity>absent() : Optional.of(Iterables.getOnlyElement(added.get())); + if (!added.hasError()) { + return ReferenceWithError.newInstanceWithoutError( result ); + } else { + if (added.masksErrorIfPresent()) { + return ReferenceWithError.newInstanceMaskingError( result, added.getError() ); + } else { + return ReferenceWithError.newInstanceThrowingError( result, added.getError() ); + } + } + } + + protected ReferenceWithError<Collection<Entity>> addInEachLocation(Iterable<Location> locations, Map<?,?> flags) { + List<Entity> addedEntities = Lists.newArrayList(); + Map<Entity, Location> addedEntityLocations = Maps.newLinkedHashMap(); + Map<Entity, Task<?>> tasks = Maps.newLinkedHashMap(); + + for (Location loc : locations) { + Entity entity = addNode(loc, flags); + addedEntities.add(entity); + addedEntityLocations.put(entity, loc); + if (entity instanceof Startable) { + Map<String, ?> args = ImmutableMap.of("locations", ImmutableList.of(loc)); + Task<Void> task = Effectors.invocation(entity, Startable.START, args).asTask(); + tasks.put(entity, task); + } + } + + Task<List<?>> parallel = Tasks.parallel("starting "+tasks.size()+" node"+Strings.s(tasks.size())+" (parallel)", tasks.values()); + TaskTags.markInessential(parallel); + DynamicTasks.queueIfPossible(parallel).orSubmitAsync(this); + Map<Entity, Throwable> errors = waitForTasksOnEntityStart(tasks); + + // if tracking, then report success/fail to the ZoneFailureDetector + if (isAvailabilityZoneEnabled()) { + for (Map.Entry<Entity, Location> entry : addedEntityLocations.entrySet()) { + Entity entity = entry.getKey(); + Location loc = entry.getValue(); + Throwable err = errors.get(entity); + if (err == null) { + getZoneFailureDetector().onStartupSuccess(loc, entity); + } else { + getZoneFailureDetector().onStartupFailure(loc, entity, err); + } + } + } + + Collection<Entity> result = MutableList.<Entity> builder() + .addAll(addedEntities) + .removeAll(errors.keySet()) + .build(); + + // quarantine/cleanup as necessary + if (!errors.isEmpty()) { + if (isQuarantineEnabled()) { + quarantineFailedNodes(errors.keySet()); + } else { + cleanupFailedNodes(errors.keySet()); + } + return ReferenceWithError.newInstanceMaskingError(result, Exceptions.create(errors.values())); + } + + return ReferenceWithError.newInstanceWithoutError(result); + } + + protected void quarantineFailedNodes(Collection<Entity> failedEntities) { + for (Entity entity : failedEntities) { + sensors().emit(ENTITY_QUARANTINED, entity); + getQuarantineGroup().addMember(entity); + removeMember(entity); + } + } + + protected void cleanupFailedNodes(Collection<Entity> failedEntities) { + // TODO Could also call stop on them? + for (Entity entity : failedEntities) { + discardNode(entity); + } + } + + protected Map<Entity, Throwable> waitForTasksOnEntityStart(Map<? extends Entity,? extends Task<?>> tasks) { + // TODO Could have CompoundException, rather than propagating first + Map<Entity, Throwable> errors = Maps.newLinkedHashMap(); + + for (Map.Entry<? extends Entity,? extends Task<?>> entry : tasks.entrySet()) { + Entity entity = entry.getKey(); + Task<?> task = entry.getValue(); + try { + task.get(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } catch (Throwable t) { + Throwable interesting = Exceptions.getFirstInteresting(t); + LOG.error("Cluster "+this+" failed to start entity "+entity+" (removing): "+interesting, interesting); + LOG.debug("Trace for: Cluster "+this+" failed to start entity "+entity+" (removing): "+t, t); + // previously we unwrapped but now there is no need I think + errors.put(entity, t); + } + } + return errors; + } + + @Override + public boolean removeChild(Entity child) { + boolean changed = super.removeChild(child); + if (changed) { + removeMember(child); + } + return changed; + } + + protected Map<?,?> getCustomChildFlags() { + return getConfig(CUSTOM_CHILD_FLAGS); + } + + @Override + public Entity addNode(Location loc, Map<?, ?> extraFlags) { + // In case subclasses are foolish and do not call super.init() when overriding. + initialiseMemberId(); + Map<?, ?> createFlags = MutableMap.builder() + .putAll(getCustomChildFlags()) + .putAll(extraFlags) + .put(CLUSTER_MEMBER_ID, sensors().get(NEXT_CLUSTER_MEMBER_ID).get()) + .build(); + if (LOG.isDebugEnabled()) { - LOG.debug("Creating and adding a node to cluster {}({}) with properties {}", new Object[] { this, getId(), createFlags }); ++ LOG.debug("Creating and adding a node to cluster {}({}) with properties {}", new Object[] { this, getId(), Sanitizer.sanitize(createFlags) }); + } + + // TODO should refactor to have a createNodeSpec; and spec should support initial sensor values + Entity entity = createNode(loc, createFlags); + + entity.sensors().set(CLUSTER_MEMBER, true); + entity.sensors().set(CLUSTER, this); + + // Continue to call manage(), because some uses of NodeFactory (in tests) still instantiate the + // entity via its constructor + Entities.manage(entity); + + addMember(entity); + return entity; + } + + protected Entity createNode(@Nullable Location loc, Map<?,?> flags) { + EntitySpec<?> memberSpec = null; + if (getMembers().isEmpty()) memberSpec = getFirstMemberSpec(); + if (memberSpec == null) memberSpec = getMemberSpec(); + + if (memberSpec != null) { + return addChild(EntitySpec.create(memberSpec).configure(flags).location(loc)); + } + + EntityFactory<?> factory = getFactory(); + if (factory == null) { + throw new IllegalStateException("No member spec nor entity factory supplied for dynamic cluster "+this); + } + EntityFactory<?> factoryToUse = (factory instanceof EntityFactoryForLocation) ? ((EntityFactoryForLocation<?>) factory).newFactoryForLocation(loc) : factory; + Entity entity = factoryToUse.newEntity(flags, this); + if (entity==null) { + throw new IllegalStateException("EntityFactory factory routine returned null entity, in "+this); + } + if (entity.getParent()==null) entity.setParent(this); + + return entity; + } + + protected List<Entity> pickAndRemoveMembers(int delta) { + if (delta==0) + return Lists.newArrayList(); + + if (delta == 1 && !isAvailabilityZoneEnabled()) { + Maybe<Entity> member = tryPickAndRemoveMember(); + return (member.isPresent()) ? ImmutableList.of(member.get()) : ImmutableList.<Entity>of(); + } + + // TODO inefficient impl + Preconditions.checkState(getMembers().size() > 0, "Attempt to remove a node (delta "+delta+") when members is empty, from cluster " + this); + if (LOG.isDebugEnabled()) LOG.debug("Removing a node from {}", this); + + if (isAvailabilityZoneEnabled()) { + Multimap<Location, Entity> membersByLocation = getMembersByLocation(); + List<Entity> entities = getZonePlacementStrategy().entitiesToRemove(membersByLocation, delta); + + Preconditions.checkState(entities.size() == delta, "Incorrect num entity chosen for removal from %s (%s when expected %s)", + getId(), entities.size(), delta); + + for (Entity entity : entities) { + removeMember(entity); + } + return entities; + } else { + List<Entity> entities = Lists.newArrayList(); + for (int i = 0; i < delta; i++) { + // don't assume we have enough members; e.g. if shrinking to zero and someone else concurrently stops a member, + // then just return what we were able to remove. + Maybe<Entity> member = tryPickAndRemoveMember(); + if (member.isPresent()) entities.add(member.get()); + } + return entities; + } + } + + private Maybe<Entity> tryPickAndRemoveMember() { + assert !isAvailabilityZoneEnabled() : "should instead call pickAndRemoveMembers(int) if using availability zones"; + + // TODO inefficient impl + Collection<Entity> members = getMembers(); + if (members.isEmpty()) return Maybe.absent(); + + if (LOG.isDebugEnabled()) LOG.debug("Removing a node from {}", this); + Entity entity = getRemovalStrategy().apply(members); + Preconditions.checkNotNull(entity, "No entity chosen for removal from "+getId()); + + removeMember(entity); + return Maybe.of(entity); + } + + protected void discardNode(Entity entity) { + removeMember(entity); + try { + Entities.unmanage(entity); + } catch (IllegalStateException e) { + //probably already unmanaged + LOG.debug("Exception during removing member of cluster " + this + ", unmanaging node " + entity + ". The node is probably already unmanaged.", e); + } + } + + protected void stopAndRemoveNode(Entity member) { + removeMember(member); + + try { + if (member instanceof Startable) { + Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap()); + task.getUnchecked(); + } + } finally { + Entities.unmanage(member); + } + } + }
