[ https://issues.apache.org/jira/browse/BROOKLYN-544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16201662#comment-16201662 ]
ASF GitHub Bot commented on BROOKLYN-544: ----------------------------------------- Github user aledsage commented on a diff in the pull request: https://github.com/apache/brooklyn-server/pull/860#discussion_r144226175 --- Diff: core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java --- @@ -138,62 +165,91 @@ private void checkPath(Collection<String> path) { Preconditions.checkArgument(!path.isEmpty(), "path can't be empty"); } + /** + * Sets a value and published it. + * <p> + * Constraints of {@link #getLockInternal()} apply, with lock-holder being the supplied {@link Function}. + */ public <T> T update(AttributeSensor<T> attribute, T newValue) { // 2017-10 this was unsynched which meant if two threads updated // the last publication would not correspond to the last value. - // could introduce deadlock but emit internal and publish should - // not seek any locks. _subscribe_ and _delivery_ might, but they - // won't be in this block. an issue with _subscribe-and-get-initial_ - // should be resolved by initial subscription queueing the publication - // to a context where locks are not held. - synchronized (values) { - T oldValue = updateWithoutPublishing(attribute, newValue); + // a crude `synchronized (values)` could cause deadlock as + // this does publish (doing `synch(LSM)`) whereas _subscribe_ + // would have the LSM lock and try to `synch(values)` + // as described at https://issues.apache.org/jira/browse/BROOKLYN-544. + // the addition of getLockInternal should make this much cleaner, + // as no one holding `synch(LSM)` should be updating here, + // ands gets here won't call any other code at all + return withLock(() -> { + T oldValue = updateInternalWithoutLockOrPublish(attribute, newValue); entity.emitInternal(attribute, newValue); return oldValue; - } + }); } + /** @deprecated since 0.13.0 this is becoming an internal method, {@link #updateInternalWithoutLockOrPublish(AttributeSensor, Object)} */ + @Deprecated public <T> T updateWithoutPublishing(AttributeSensor<T> attribute, T newValue) { - if (log.isTraceEnabled()) { - Object oldValue = getValue(attribute); - if (!Objects.equal(oldValue, newValue != null)) { - log.trace("setting attribute {} to {} (was {}) on {}", new Object[] {attribute.getName(), newValue, oldValue, entity}); - } else { - log.trace("setting attribute {} to {} (unchanged) on {}", new Object[] {attribute.getName(), newValue, this}); + return updateInternalWithoutLockOrPublish(attribute, newValue); + } + + @Beta + public <T> T updateInternalWithoutLockOrPublish(AttributeSensor<T> attribute, T newValue) { + synchronized (values) { + if (log.isTraceEnabled()) { + Object oldValue = getValue(attribute); + if (!Objects.equal(oldValue, newValue != null)) { + log.trace("setting attribute {} to {} (was {}) on {}", new Object[] {attribute.getName(), newValue, oldValue, entity}); + } else { + log.trace("setting attribute {} to {} (unchanged) on {}", new Object[] {attribute.getName(), newValue, this}); + } } + + T oldValue = update(attribute.getNameParts(), newValue); + return (isNull(oldValue)) ? null : oldValue; } - - T oldValue = update(attribute.getNameParts(), newValue); - - return (isNull(oldValue)) ? null : oldValue; } + private <T> T withLock(Callable<T> body) { return Locks.withLock(getLockInternal(), body); } + private void withLock(Runnable body) { Locks.withLock(getLockInternal(), body); } + /** - * Where atomicity is desired, the methods in this class synchronize on the {@link #values} map. + * Where atomicity is desired, this method wraps an acquisition of {@link #getLockInternal()} + * while applying the given {@link Function}. + * <p> + * Constraints of {@link #getLockInternal()} apply, with lock-holder being the supplied {@link Function}. */ public <T> T modify(AttributeSensor<T> attribute, Function<? super T, Maybe<T>> modifier) { - synchronized (values) { + return withLock(() -> { T oldValue = getValue(attribute); Maybe<? extends T> newValue = modifier.apply(oldValue); - + if (newValue.isPresent()) { if (log.isTraceEnabled()) log.trace("modified attribute {} to {} (was {}) on {}", new Object[] {attribute.getName(), newValue, oldValue, entity}); return update(attribute, newValue.get()); } else { if (log.isTraceEnabled()) log.trace("modified attribute {} unchanged; not emitting on {}", new Object[] {attribute.getName(), newValue, this}); return oldValue; } - } + }); } + /** Removes a sensor. The {@link #getLockInternal()} is acquired, + * and constraints on the caller described there apply to callers of this method. + * <p> + * The caller is responsible for any subsequent actions needed, including publishing + * the removal of the sensor and triggering persistence. Caller may wish to + * have the {@link #getLockInternal()} while doing that. */ public void remove(AttributeSensor<?> attribute) { BrooklynLogging.log(log, BrooklynLogging.levelDebugOrTraceIfReadOnly(entity), "removing attribute {} on {}", attribute.getName(), entity); - - remove(attribute.getNameParts()); + withLock(() -> remove(attribute.getNameParts()) ); } // TODO path must be ordered(and legal to contain duplicates like "a.b.a"; list would be better + /** @deprecated since 0.13.0 becoming private; callers should only ever use {@link #remove(AttributeSensor)} --- End diff -- `since 1.0.0` > Deadlock in enricher's subscription > ----------------------------------- > > Key: BROOKLYN-544 > URL: https://issues.apache.org/jira/browse/BROOKLYN-544 > Project: Brooklyn > Issue Type: Bug > Reporter: Aled Sage > Priority: Blocker > > With brooklyn 1.0.0-SNAPSHOT, running the test > {{YamlRollingTimeWindowMeanEnricherTest}} many times, it hung with the > deadlock shown below: > {noformat} > Found one Java-level deadlock: > ============================= > "brooklyn-execmanager-CkWF5Jg7-1": > waiting to lock monitor 0x00007f8e9c025de8 (object 0x00000007b5fee528, a > org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager), > which is held by "main" > "main": > waiting to lock monitor 0x00007f8e9b95ebe8 (object 0x00000007b6042cf8, a > java.util.Collections$SynchronizedMap), > which is held by "brooklyn-execmanager-CkWF5Jg7-1" > Java stack information for the threads listed above: > =================================================== > "brooklyn-execmanager-CkWF5Jg7-1": > at > org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager.getSubscriptionsForEntitySensor(LocalSubscriptionManager.java:207) > - waiting to lock <0x00000007b5fee528> (a > org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager) > at > org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager.publish(LocalSubscriptionManager.java:255) > at > org.apache.brooklyn.core.mgmt.internal.BasicSubscriptionContext.publish(BasicSubscriptionContext.java:176) > at > org.apache.brooklyn.core.entity.AbstractEntity$BasicSensorSupport.emitInternal(AbstractEntity.java:1059) > at > org.apache.brooklyn.core.sensor.AttributeMap.update(AttributeMap.java:151) > - locked <0x00000007b6042cf8> (a > java.util.Collections$SynchronizedMap) > at > org.apache.brooklyn.core.entity.AbstractEntity$BasicSensorSupport.set(AbstractEntity.java:957) > at > org.apache.brooklyn.core.enricher.AbstractEnricher.emit(AbstractEnricher.java:144) > at > org.apache.brooklyn.enricher.stock.AbstractTransformer.onEvent(AbstractTransformer.java:157) > at > org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager$1.run(LocalSubscriptionManager.java:336) > at > org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager.submitPublishEvent(LocalSubscriptionManager.java:352) > at > org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager.lambda$0(LocalSubscriptionManager.java:192) > at > org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager$$Lambda$3/871160466.run(Unknown > Source) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at > org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:565) > at > org.apache.brooklyn.util.core.task.SingleThreadedScheduler$1.call(SingleThreadedScheduler.java:116) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > "main": > at java.util.Collections$SynchronizedMap.get(Collections.java:2584) > - waiting to lock <0x00000007b6042cf8> (a > java.util.Collections$SynchronizedMap) > at > org.apache.brooklyn.core.sensor.AttributeMap.getValue(AttributeMap.java:219) > at > org.apache.brooklyn.core.sensor.AttributeMap.getValue(AttributeMap.java:225) > at > org.apache.brooklyn.core.entity.AbstractEntity$BasicSensorSupport.get(AbstractEntity.java:932) > at > org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager.subscribe(LocalSubscriptionManager.java:147) > - locked <0x00000007b5fee528> (a > org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager) > at > org.apache.brooklyn.core.mgmt.internal.AbstractSubscriptionManager.subscribe(AbstractSubscriptionManager.java:106) > at > org.apache.brooklyn.core.mgmt.internal.BasicSubscriptionContext.subscribe(BasicSubscriptionContext.java:98) > at > org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker.subscribe(SubscriptionTracker.java:67) > at > org.apache.brooklyn.core.objs.AbstractEntityAdjunct$BasicSubscriptionSupport.subscribe(AbstractEntityAdjunct.java:238) > at > org.apache.brooklyn.enricher.stock.AbstractTransformer.setEntity(AbstractTransformer.java:120) > at > org.apache.brooklyn.enricher.stock.YamlRollingTimeWindowMeanEnricher.setEntity(YamlRollingTimeWindowMeanEnricher.java:93) > at > org.apache.brooklyn.core.entity.AbstractEntity$BasicEnricherSupport.add(AbstractEntity.java:1443) > at > org.apache.brooklyn.core.entity.AbstractEntity$BasicEnricherSupport.add(AbstractEntity.java:1429) > at > org.apache.brooklyn.enricher.stock.YamlRollingTimeWindowMeanEnricherTest.setUp(YamlRollingTimeWindowMeanEnricherTest.java:68) > at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104) > at > org.testng.internal.Invoker.invokeConfigurationMethod(Invoker.java:515) > at org.testng.internal.Invoker.invokeConfigurations(Invoker.java:217) > at org.testng.internal.Invoker.invokeMethod(Invoker.java:590) > at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851) > at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177) > at > org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129) > at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112) > at org.testng.TestRunner.privateRun(TestRunner.java:756) > at org.testng.TestRunner.run(TestRunner.java:610) > at org.testng.SuiteRunner.runTest(SuiteRunner.java:387) > at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:382) > at org.testng.SuiteRunner.privateRun(SuiteRunner.java:340) > at org.testng.SuiteRunner.run(SuiteRunner.java:289) > at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) > at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86) > at org.testng.TestNG.runSuitesSequentially(TestNG.java:1293) > at org.testng.TestNG.runSuitesLocally(TestNG.java:1218) > at org.testng.TestNG.runSuites(TestNG.java:1133) > at org.testng.TestNG.run(TestNG.java:1104) > at > org.testng.remote.AbstractRemoteTestNG.run(AbstractRemoteTestNG.java:132) > at org.testng.remote.RemoteTestNG.initAndRun(RemoteTestNG.java:230) > at org.testng.remote.RemoteTestNG.main(RemoteTestNG.java:76) > Found 1 deadlock. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)