Repository: brooklyn-server Updated Branches: refs/heads/master 0df0a4889 -> 5361c4d82
define canonical synch order to fix potential deadlock and now preserve happens-before for child add-remove see: https://issues.apache.org/jira/browse/BROOKLYN-498 Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/7df1afe5 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/7df1afe5 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/7df1afe5 Branch: refs/heads/master Commit: 7df1afe5af01c1b02fa08760ff251de30c20bc7e Parents: 48933eb Author: Alex Heneveld <[email protected]> Authored: Mon May 8 10:32:09 2017 +0100 Committer: Alex Heneveld <[email protected]> Committed: Mon May 8 10:32:09 2017 +0100 ---------------------------------------------------------------------- .../brooklyn/core/entity/AbstractEntity.java | 65 +++--- .../brooklyn/core/sensor/AttributeMap.java | 10 + .../entity/group/AbstractGroupImpl.java | 212 ++++++++++--------- 3 files changed, 159 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7df1afe5/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java index 6c15afc..ab02285 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java @@ -217,8 +217,15 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E // values. They must be thread-safe, and where necessary (e.g. group) they should preserve order // if possible. private Reference<Entity> parent = new BasicReference<Entity>(); - private Set<Group> groupsInternal = Collections.synchronizedSet(Sets.<Group>newLinkedHashSet()); + /** Synchronize on this when updating to ensure addition/removals done in order, + * and notifications done in order. + * If calling other code while holding this synch lock, any synch locks it might call should be called first + * (in particular the AttributeMap.values should be obtained before this if publishing.) */ private Set<Entity> children = Collections.synchronizedSet(Sets.<Entity>newLinkedHashSet()); + /** Synchronize on this to ensure group and members are updated at the same time. + * Synchronize behavior as for {@link #children} apply here, and in addition + * the parent's "members" lock should be obtained first. */ + private Set<Group> groupsInternal = Collections.synchronizedSet(Sets.<Group>newLinkedHashSet()); private Reference<List<Location>> locations = new BasicReference<List<Location>>(ImmutableList.<Location>of()); // dups removed in addLocations private Reference<Long> creationTimeUtc = new BasicReference<Long>(System.currentTimeMillis()); private Reference<String> displayName = new BasicReference<String>(); @@ -554,6 +561,14 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E } } + /** Where code needs to synch on the attributes, it can access the low-level object used for synching + * through this method. Internally, all attribute updates synch on this object. Code wishing to + * update attributes or publish while holding some other lock should acquire the monitor on this + * object first to prevent deadlock. */ + protected Object getAttributesSynchObjectInternal() { + return attributesInternal.getSynchObjectInternal(); + } + @Override public Map<String, String> toMetadataRecord() { return ImmutableMap.of(); @@ -667,19 +682,18 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E checkNotNull(child, "child must not be null (for entity %s)", this); CatalogUtils.setCatalogItemIdOnAddition(this, child); - boolean changed; - synchronized (children) { - if (Entities.isAncestor(this, child)) throw new IllegalStateException("loop detected trying to add child "+child+" to "+this+"; it is already an ancestor"); - child.setParent(getProxyIfAvailable()); - changed = children.add(child); - - getManagementSupport().getEntityChangeListener().onChildrenChanged(); - } - - // TODO not holding synchronization lock while notifying risks out-of-order if addChild+removeChild called in rapid succession. - // But doing notification in synchronization block may risk deadlock? - if (changed) { - sensors().emit(AbstractEntity.CHILD_ADDED, child); + synchronized (getAttributesSynchObjectInternal()) { + // hold synch locks in this order to prevent deadlock + synchronized (children) { + if (Entities.isAncestor(this, child)) throw new IllegalStateException("loop detected trying to add child "+child+" to "+this+"; it is already an ancestor"); + child.setParent(getProxyIfAvailable()); + boolean changed = children.add(child); + + getManagementSupport().getEntityChangeListener().onChildrenChanged(); + if (changed) { + sensors().emit(AbstractEntity.CHILD_ADDED, child); + } + } } return child; } @@ -709,20 +723,21 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E @Override public boolean removeChild(Entity child) { - boolean changed; - synchronized (children) { - changed = children.remove(child); - child.clearParent(); + synchronized (getAttributesSynchObjectInternal()) { + synchronized (children) { + boolean changed = children.remove(child); + child.clearParent(); + + if (changed) { + getManagementSupport().getEntityChangeListener().onChildrenChanged(); + } - if (changed) { - getManagementSupport().getEntityChangeListener().onChildrenChanged(); + if (changed) { + sensors().emit(AbstractEntity.CHILD_REMOVED, child); + } + return changed; } } - - if (changed) { - sensors().emit(AbstractEntity.CHILD_REMOVED, child); - } - return changed; } // -------- GROUPS -------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7df1afe5/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java index 50a95c4..dee0700 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java @@ -33,6 +33,7 @@ import org.apache.brooklyn.util.guava.Maybe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.Beta; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Objects; @@ -80,6 +81,15 @@ public final class AttributeMap { this.values = checkNotNull(storage, "storage map must not be null"); } + /** Internal object this class synchs on when modifying values. + * Exposed for internal usage to synchronize on this to enforce canonical order. + * @return + */ + @Beta + public Object getSynchObjectInternal() { + return values; + } + public Map<Collection<String>, Object> asRawMap() { synchronized (values) { return ImmutableMap.copyOf(values); http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7df1afe5/core/src/main/java/org/apache/brooklyn/entity/group/AbstractGroupImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/AbstractGroupImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/AbstractGroupImpl.java index 6ca8b74..a4bbf62 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/AbstractGroupImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/AbstractGroupImpl.java @@ -119,53 +119,55 @@ public abstract class AbstractGroupImpl extends AbstractEntity implements Abstra */ @Override public boolean addMember(Entity member) { - synchronized (members) { - if (Entities.isNoLongerManaged(member)) { - // Don't add dead entities, as they could never be removed (because addMember could be called in - // concurrent thread as removeMember triggered by the unmanage). - // Not using Entities.isManaged here, as could be called in entity.init() - log.debug("Group {} ignoring new member {}, because it is no longer managed", this, member); - return false; - } - - // FIXME do not set sensors on members; possibly we don't need FIRST at all, just look at the first in MEMBERS, and take care to guarantee order there - Entity first = getAttribute(FIRST); - if (first == null) { - member.sensors().set(FIRST_MEMBER, true); - member.sensors().set(FIRST, member); - sensors().set(FIRST, member); - } else { - if (first.equals(member) || first.equals(member.getAttribute(FIRST))) { - // do nothing (rebinding) + synchronized (getAttributesSynchObjectInternal()) { + synchronized (members) { + if (Entities.isNoLongerManaged(member)) { + // Don't add dead entities, as they could never be removed (because addMember could be called in + // concurrent thread as removeMember triggered by the unmanage). + // Not using Entities.isManaged here, as could be called in entity.init() + log.debug("Group {} ignoring new member {}, because it is no longer managed", this, member); + return false; + } + + // FIXME do not set sensors on members; possibly we don't need FIRST at all, just look at the first in MEMBERS, and take care to guarantee order there + Entity first = getAttribute(FIRST); + if (first == null) { + member.sensors().set(FIRST_MEMBER, true); + member.sensors().set(FIRST, member); + sensors().set(FIRST, member); } else { - member.sensors().set(FIRST_MEMBER, false); - member.sensors().set(FIRST, first); + if (first.equals(member) || first.equals(member.getAttribute(FIRST))) { + // do nothing (rebinding) + } else { + member.sensors().set(FIRST_MEMBER, false); + member.sensors().set(FIRST, first); + } } - } - - ((EntityInternal)member).groups().add((Group)getProxyIfAvailable()); - boolean changed = addMemberInternal(member); - if (changed) { - log.debug("Group {} got new member {}", this, member); - sensors().set(GROUP_SIZE, getCurrentSize()); - sensors().set(GROUP_MEMBERS, getMembers()); - // emit after the above so listeners can use getMembers() and getCurrentSize() - sensors().emit(MEMBER_ADDED, member); - - if (Boolean.TRUE.equals(getConfig(MEMBER_DELEGATE_CHILDREN))) { - log.warn("Use of deprecated ConfigKey {} in {} (as of 0.9.0)", MEMBER_DELEGATE_CHILDREN.getName(), this); - Optional<Entity> result = Iterables.tryFind(getChildren(), Predicates.equalTo(member)); - if (!result.isPresent()) { - String nameFormat = Optional.fromNullable(getConfig(MEMBER_DELEGATE_NAME_FORMAT)).or("%s"); - DelegateEntity child = addChild(EntitySpec.create(DelegateEntity.class) - .configure(DelegateEntity.DELEGATE_ENTITY, member) - .displayName(String.format(nameFormat, member.getDisplayName()))); + + ((EntityInternal)member).groups().add((Group)getProxyIfAvailable()); + boolean changed = addMemberInternal(member); + if (changed) { + log.debug("Group {} got new member {}", this, member); + sensors().set(GROUP_SIZE, getCurrentSize()); + sensors().set(GROUP_MEMBERS, getMembers()); + // emit after the above so listeners can use getMembers() and getCurrentSize() + sensors().emit(MEMBER_ADDED, member); + + if (Boolean.TRUE.equals(getConfig(MEMBER_DELEGATE_CHILDREN))) { + log.warn("Use of deprecated ConfigKey {} in {} (as of 0.9.0)", MEMBER_DELEGATE_CHILDREN.getName(), this); + Optional<Entity> result = Iterables.tryFind(getChildren(), Predicates.equalTo(member)); + if (!result.isPresent()) { + String nameFormat = Optional.fromNullable(getConfig(MEMBER_DELEGATE_NAME_FORMAT)).or("%s"); + DelegateEntity child = addChild(EntitySpec.create(DelegateEntity.class) + .configure(DelegateEntity.DELEGATE_ENTITY, member) + .displayName(String.format(nameFormat, member.getDisplayName()))); + } } + + getManagementSupport().getEntityChangeListener().onMembersChanged(); } - - getManagementSupport().getEntityChangeListener().onMembersChanged(); + return changed; } - return changed; } } @@ -181,58 +183,60 @@ public abstract class AbstractGroupImpl extends AbstractEntity implements Abstra */ @Override public boolean removeMember(final Entity member) { - synchronized (members) { - boolean changed = (member != null && members.remove(member)); - if (changed) { - log.debug("Group {} lost member {}", this, member); - // TODO ideally the following are all synched - sensors().set(GROUP_SIZE, getCurrentSize()); - sensors().set(GROUP_MEMBERS, getMembers()); - if (member.equals(getAttribute(FIRST))) { - // TODO should we elect a new FIRST ? as is the *next* will become first. could we do away with FIRST altogether? - sensors().set(FIRST, null); - } - // emit after the above so listeners can use getMembers() and getCurrentSize() - sensors().emit(MEMBER_REMOVED, member); - - if (Boolean.TRUE.equals(getConfig(MEMBER_DELEGATE_CHILDREN))) { - Optional<Entity> result = Iterables.tryFind(getChildren(), new Predicate<Entity>() { - @Override - public boolean apply(Entity input) { - Entity delegate = input.getConfig(DelegateEntity.DELEGATE_ENTITY); - if (delegate == null) return false; - return delegate.equals(member); + synchronized (getAttributesSynchObjectInternal()) { + synchronized (members) { + boolean changed = (member != null && members.remove(member)); + if (changed) { + log.debug("Group {} lost member {}", this, member); + // TODO ideally the following are all synched + sensors().set(GROUP_SIZE, getCurrentSize()); + sensors().set(GROUP_MEMBERS, getMembers()); + if (member.equals(getAttribute(FIRST))) { + // TODO should we elect a new FIRST ? as is the *next* will become first. could we do away with FIRST altogether? + sensors().set(FIRST, null); + } + // emit after the above so listeners can use getMembers() and getCurrentSize() + sensors().emit(MEMBER_REMOVED, member); + + if (Boolean.TRUE.equals(getConfig(MEMBER_DELEGATE_CHILDREN))) { + Optional<Entity> result = Iterables.tryFind(getChildren(), new Predicate<Entity>() { + @Override + public boolean apply(Entity input) { + Entity delegate = input.getConfig(DelegateEntity.DELEGATE_ENTITY); + if (delegate == null) return false; + return delegate.equals(member); + } + }); + if (result.isPresent()) { + Entity child = result.get(); + removeChild(child); + Entities.unmanage(child); } - }); - if (result.isPresent()) { - Entity child = result.get(); - removeChild(child); - Entities.unmanage(child); } + } - - } - - Exception errorRemoving = null; - // suppress errors if member is being unmanaged in parallel - try { - ((EntityInternal)member).groups().remove((Group)getProxyIfAvailable()); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - errorRemoving = e; - } - - getManagementSupport().getEntityChangeListener().onMembersChanged(); - - if (errorRemoving!=null) { - if (Entities.isNoLongerManaged(member)) { - log.debug("Ignoring error when telling group "+this+" unmanaged member "+member+" is is removed: "+errorRemoving); - } else { - Exceptions.propagate(errorRemoving); + + Exception errorRemoving = null; + // suppress errors if member is being unmanaged in parallel + try { + ((EntityInternal)member).groups().remove((Group)getProxyIfAvailable()); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + errorRemoving = e; } + + getManagementSupport().getEntityChangeListener().onMembersChanged(); + + if (errorRemoving!=null) { + if (Entities.isNoLongerManaged(member)) { + log.debug("Ignoring error when telling group "+this+" unmanaged member "+member+" is is removed: "+errorRemoving); + } else { + Exceptions.propagate(errorRemoving); + } + } + + return changed; } - - return changed; } } @@ -243,22 +247,24 @@ public abstract class AbstractGroupImpl extends AbstractEntity implements Abstra @Override public void setMembers(Collection<Entity> mm, Predicate<Entity> filter) { - synchronized (members) { - log.debug("Group {} members set explicitly to {} (of which some possibly filtered)", this, members); - List<Entity> mmo = new ArrayList<Entity>(getMembers()); - for (Entity m: mmo) { - if (!(mm.contains(m) && (filter==null || filter.apply(m)))) - // remove, unless already present, being set, and not filtered out - removeMember(m); - } - for (Entity m: mm) { - if ((!mmo.contains(m)) && (filter==null || filter.apply(m))) { - // add if not alrady contained, and not filtered out - addMember(m); + synchronized (getAttributesSynchObjectInternal()) { + synchronized (members) { + log.debug("Group {} members set explicitly to {} (of which some possibly filtered)", this, members); + List<Entity> mmo = new ArrayList<Entity>(getMembers()); + for (Entity m: mmo) { + if (!(mm.contains(m) && (filter==null || filter.apply(m)))) + // remove, unless already present, being set, and not filtered out + removeMember(m); } + for (Entity m: mm) { + if ((!mmo.contains(m)) && (filter==null || filter.apply(m))) { + // add if not alrady contained, and not filtered out + addMember(m); + } + } + + getManagementSupport().getEntityChangeListener().onMembersChanged(); } - - getManagementSupport().getEntityChangeListener().onMembersChanged(); } }
