[
https://issues.apache.org/jira/browse/BROOKLYN-544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16201662#comment-16201662
]
ASF GitHub Bot commented on BROOKLYN-544:
-----------------------------------------
Github user aledsage commented on a diff in the pull request:
https://github.com/apache/brooklyn-server/pull/860#discussion_r144226175
--- Diff:
core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java ---
@@ -138,62 +165,91 @@ private void checkPath(Collection<String> path) {
Preconditions.checkArgument(!path.isEmpty(), "path can't be
empty");
}
+ /**
+ * Sets a value and published it.
+ * <p>
+ * Constraints of {@link #getLockInternal()} apply, with lock-holder
being the supplied {@link Function}.
+ */
public <T> T update(AttributeSensor<T> attribute, T newValue) {
// 2017-10 this was unsynched which meant if two threads updated
// the last publication would not correspond to the last value.
- // could introduce deadlock but emit internal and publish should
- // not seek any locks. _subscribe_ and _delivery_ might, but they
- // won't be in this block. an issue with
_subscribe-and-get-initial_
- // should be resolved by initial subscription queueing the
publication
- // to a context where locks are not held.
- synchronized (values) {
- T oldValue = updateWithoutPublishing(attribute, newValue);
+ // a crude `synchronized (values)` could cause deadlock as
+ // this does publish (doing `synch(LSM)`) whereas _subscribe_
+ // would have the LSM lock and try to `synch(values)`
+ // as described at
https://issues.apache.org/jira/browse/BROOKLYN-544.
+ // the addition of getLockInternal should make this much cleaner,
+ // as no one holding `synch(LSM)` should be updating here,
+ // ands gets here won't call any other code at all
+ return withLock(() -> {
+ T oldValue = updateInternalWithoutLockOrPublish(attribute,
newValue);
entity.emitInternal(attribute, newValue);
return oldValue;
- }
+ });
}
+ /** @deprecated since 0.13.0 this is becoming an internal method,
{@link #updateInternalWithoutLockOrPublish(AttributeSensor, Object)} */
+ @Deprecated
public <T> T updateWithoutPublishing(AttributeSensor<T> attribute, T
newValue) {
- if (log.isTraceEnabled()) {
- Object oldValue = getValue(attribute);
- if (!Objects.equal(oldValue, newValue != null)) {
- log.trace("setting attribute {} to {} (was {}) on {}", new
Object[] {attribute.getName(), newValue, oldValue, entity});
- } else {
- log.trace("setting attribute {} to {} (unchanged) on {}",
new Object[] {attribute.getName(), newValue, this});
+ return updateInternalWithoutLockOrPublish(attribute, newValue);
+ }
+
+ @Beta
+ public <T> T updateInternalWithoutLockOrPublish(AttributeSensor<T>
attribute, T newValue) {
+ synchronized (values) {
+ if (log.isTraceEnabled()) {
+ Object oldValue = getValue(attribute);
+ if (!Objects.equal(oldValue, newValue != null)) {
+ log.trace("setting attribute {} to {} (was {}) on {}",
new Object[] {attribute.getName(), newValue, oldValue, entity});
+ } else {
+ log.trace("setting attribute {} to {} (unchanged) on
{}", new Object[] {attribute.getName(), newValue, this});
+ }
}
+
+ T oldValue = update(attribute.getNameParts(), newValue);
+ return (isNull(oldValue)) ? null : oldValue;
}
-
- T oldValue = update(attribute.getNameParts(), newValue);
-
- return (isNull(oldValue)) ? null : oldValue;
}
+ private <T> T withLock(Callable<T> body) { return
Locks.withLock(getLockInternal(), body); }
+ private void withLock(Runnable body) {
Locks.withLock(getLockInternal(), body); }
+
/**
- * Where atomicity is desired, the methods in this class synchronize
on the {@link #values} map.
+ * Where atomicity is desired, this method wraps an acquisition of
{@link #getLockInternal()}
+ * while applying the given {@link Function}.
+ * <p>
+ * Constraints of {@link #getLockInternal()} apply, with lock-holder
being the supplied {@link Function}.
*/
public <T> T modify(AttributeSensor<T> attribute, Function<? super T,
Maybe<T>> modifier) {
- synchronized (values) {
+ return withLock(() -> {
T oldValue = getValue(attribute);
Maybe<? extends T> newValue = modifier.apply(oldValue);
-
+
if (newValue.isPresent()) {
if (log.isTraceEnabled()) log.trace("modified attribute {}
to {} (was {}) on {}", new Object[] {attribute.getName(), newValue, oldValue,
entity});
return update(attribute, newValue.get());
} else {
if (log.isTraceEnabled()) log.trace("modified attribute {}
unchanged; not emitting on {}", new Object[] {attribute.getName(), newValue,
this});
return oldValue;
}
- }
+ });
}
+ /** Removes a sensor. The {@link #getLockInternal()} is acquired,
+ * and constraints on the caller described there apply to callers of
this method.
+ * <p>
+ * The caller is responsible for any subsequent actions needed,
including publishing
+ * the removal of the sensor and triggering persistence. Caller may
wish to
+ * have the {@link #getLockInternal()} while doing that. */
public void remove(AttributeSensor<?> attribute) {
BrooklynLogging.log(log,
BrooklynLogging.levelDebugOrTraceIfReadOnly(entity),
"removing attribute {} on {}", attribute.getName(), entity);
-
- remove(attribute.getNameParts());
+ withLock(() -> remove(attribute.getNameParts()) );
}
// TODO path must be ordered(and legal to contain duplicates like
"a.b.a"; list would be better
+ /** @deprecated since 0.13.0 becoming private; callers should only
ever use {@link #remove(AttributeSensor)}
--- End diff --
`since 1.0.0`
> Deadlock in enricher's subscription
> -----------------------------------
>
> Key: BROOKLYN-544
> URL: https://issues.apache.org/jira/browse/BROOKLYN-544
> Project: Brooklyn
> Issue Type: Bug
> Reporter: Aled Sage
> Priority: Blocker
>
> With brooklyn 1.0.0-SNAPSHOT, running the test
> {{YamlRollingTimeWindowMeanEnricherTest}} many times, it hung with the
> deadlock shown below:
> {noformat}
> Found one Java-level deadlock:
> =============================
> "brooklyn-execmanager-CkWF5Jg7-1":
> waiting to lock monitor 0x00007f8e9c025de8 (object 0x00000007b5fee528, a
> org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager),
> which is held by "main"
> "main":
> waiting to lock monitor 0x00007f8e9b95ebe8 (object 0x00000007b6042cf8, a
> java.util.Collections$SynchronizedMap),
> which is held by "brooklyn-execmanager-CkWF5Jg7-1"
> Java stack information for the threads listed above:
> ===================================================
> "brooklyn-execmanager-CkWF5Jg7-1":
> at
> org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager.getSubscriptionsForEntitySensor(LocalSubscriptionManager.java:207)
> - waiting to lock <0x00000007b5fee528> (a
> org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager)
> at
> org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager.publish(LocalSubscriptionManager.java:255)
> at
> org.apache.brooklyn.core.mgmt.internal.BasicSubscriptionContext.publish(BasicSubscriptionContext.java:176)
> at
> org.apache.brooklyn.core.entity.AbstractEntity$BasicSensorSupport.emitInternal(AbstractEntity.java:1059)
> at
> org.apache.brooklyn.core.sensor.AttributeMap.update(AttributeMap.java:151)
> - locked <0x00000007b6042cf8> (a
> java.util.Collections$SynchronizedMap)
> at
> org.apache.brooklyn.core.entity.AbstractEntity$BasicSensorSupport.set(AbstractEntity.java:957)
> at
> org.apache.brooklyn.core.enricher.AbstractEnricher.emit(AbstractEnricher.java:144)
> at
> org.apache.brooklyn.enricher.stock.AbstractTransformer.onEvent(AbstractTransformer.java:157)
> at
> org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager$1.run(LocalSubscriptionManager.java:336)
> at
> org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager.submitPublishEvent(LocalSubscriptionManager.java:352)
> at
> org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager.lambda$0(LocalSubscriptionManager.java:192)
> at
> org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager$$Lambda$3/871160466.run(Unknown
> Source)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at
> org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:565)
> at
> org.apache.brooklyn.util.core.task.SingleThreadedScheduler$1.call(SingleThreadedScheduler.java:116)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> "main":
> at java.util.Collections$SynchronizedMap.get(Collections.java:2584)
> - waiting to lock <0x00000007b6042cf8> (a
> java.util.Collections$SynchronizedMap)
> at
> org.apache.brooklyn.core.sensor.AttributeMap.getValue(AttributeMap.java:219)
> at
> org.apache.brooklyn.core.sensor.AttributeMap.getValue(AttributeMap.java:225)
> at
> org.apache.brooklyn.core.entity.AbstractEntity$BasicSensorSupport.get(AbstractEntity.java:932)
> at
> org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager.subscribe(LocalSubscriptionManager.java:147)
> - locked <0x00000007b5fee528> (a
> org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager)
> at
> org.apache.brooklyn.core.mgmt.internal.AbstractSubscriptionManager.subscribe(AbstractSubscriptionManager.java:106)
> at
> org.apache.brooklyn.core.mgmt.internal.BasicSubscriptionContext.subscribe(BasicSubscriptionContext.java:98)
> at
> org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker.subscribe(SubscriptionTracker.java:67)
> at
> org.apache.brooklyn.core.objs.AbstractEntityAdjunct$BasicSubscriptionSupport.subscribe(AbstractEntityAdjunct.java:238)
> at
> org.apache.brooklyn.enricher.stock.AbstractTransformer.setEntity(AbstractTransformer.java:120)
> at
> org.apache.brooklyn.enricher.stock.YamlRollingTimeWindowMeanEnricher.setEntity(YamlRollingTimeWindowMeanEnricher.java:93)
> at
> org.apache.brooklyn.core.entity.AbstractEntity$BasicEnricherSupport.add(AbstractEntity.java:1443)
> at
> org.apache.brooklyn.core.entity.AbstractEntity$BasicEnricherSupport.add(AbstractEntity.java:1429)
> at
> org.apache.brooklyn.enricher.stock.YamlRollingTimeWindowMeanEnricherTest.setUp(YamlRollingTimeWindowMeanEnricherTest.java:68)
> at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
> at
> org.testng.internal.Invoker.invokeConfigurationMethod(Invoker.java:515)
> at org.testng.internal.Invoker.invokeConfigurations(Invoker.java:217)
> at org.testng.internal.Invoker.invokeMethod(Invoker.java:590)
> at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
> at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
> at
> org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
> at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
> at org.testng.TestRunner.privateRun(TestRunner.java:756)
> at org.testng.TestRunner.run(TestRunner.java:610)
> at org.testng.SuiteRunner.runTest(SuiteRunner.java:387)
> at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:382)
> at org.testng.SuiteRunner.privateRun(SuiteRunner.java:340)
> at org.testng.SuiteRunner.run(SuiteRunner.java:289)
> at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
> at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
> at org.testng.TestNG.runSuitesSequentially(TestNG.java:1293)
> at org.testng.TestNG.runSuitesLocally(TestNG.java:1218)
> at org.testng.TestNG.runSuites(TestNG.java:1133)
> at org.testng.TestNG.run(TestNG.java:1104)
> at
> org.testng.remote.AbstractRemoteTestNG.run(AbstractRemoteTestNG.java:132)
> at org.testng.remote.RemoteTestNG.initAndRun(RemoteTestNG.java:230)
> at org.testng.remote.RemoteTestNG.main(RemoteTestNG.java:76)
> Found 1 deadlock.
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)