[
https://issues.apache.org/jira/browse/BROOKLYN-356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15569708#comment-15569708
]
Aled Sage commented on BROOKLYN-356:
------------------------------------
I talked this over with [~svet]. We've done a very short-term improvement of
increasing the timeout. I'm looking at a longer term fix where we don't execute
so many tasks in different threads to get the value.
As Svet described, the {{Transformer}} executes a task to get the value (with a
timeout, because the value might not be available yet). It is trying to do a
non-blocking lookup of the value, but using the task api that requires the work
be executed in another thread. It therefore uses a timeout to terminate the
lookup if it hasn't returned promptly, but that obviously gives a race
condition.
A better solution for such a non-blocking lookup is to ask for the value
immediately, and to get back a {{Maybe}} (similar to guava's {{Optional}}, but
accepts null values). Each DSL (formatString, attributeWhenReady, etc) is of
type {{DeferredSupplier}}. I'm adding another interface ({{ImmediateSupplier}}
with a method {{Maybe<T> getImmediately()}}) that we'll use to get the value
immediately, if we don't want to wait for it. However, it is a bit fiddly (e.g.
the DSL execution doesn't know the context entity, so I'm making some (small)
changes to the subscription context as well).
Even longer term, we could look at reversing the responsibilities: if the value
is an {{attributeWhenReady}} then that could do the subscription itself, and
call back to the transformer when the value becomes available (or changes).
Also, by using callbacks more, we can change some of the places that we do
blocking calls (so we improve our thread usage).
Below is the current sequence of task being executed (and the thread's
stacktrace for each), when executing {{TransformerEnricherWithDslTest}} and the
transformer retrieving the value:
{noformat}
task=Task[waiting on port]@J5UlSQ04;
thread=Thread[brooklyn-execmanager-S0LtDxD8-9,5,main];
submitTime=1476272773046; startTime=1476272773046; queuedTime=-1
task=Task[formatting '%d' with 1 task]@lNzNRuj1;
thread=Thread[brooklyn-execmanager-S0LtDxD8-11,5,main];
submitTime=1476272773045; startTime=1476272773045; queuedTime=-1
task=Task[formatting '%s' with 1 task]@ExQmzZBq;
thread=Thread[brooklyn-execmanager-S0LtDxD8-10,5,main];
submitTime=1476272773045; startTime=1476272773045; queuedTime=-1
task=Task[Resolving dependent value]@wQxaXZ77;
thread=Thread[brooklyn-execmanager-S0LtDxD8-7,5,main];
submitTime=1476272773024; startTime=1476272773025; queuedTime=-1
task=Task[LSM.publishInitialValue(Application[9ftxjzc5], Sensor: port
(java.lang.Integer));[subscription-delivery-entity-pf9ftxjzc5[Application[9ftxjzc5]]]]@C1UYl8il;
thread=Thread[brooklyn-execmanager-S0LtDxD8-1,5,main];
submitTime=1476272738192; startTime=1476272738192; queuedTime=-1
task=Task[waiting on port]@J5UlSQ04;
thread=Thread[brooklyn-execmanager-S0LtDxD8-9,5,main]
"brooklyn-execmanager-S0LtDxD8-9" daemon prio=5 tid=0x00007facf22ab800
nid=0x660b at breakpoint[0x0000700001be7000]
java.lang.Thread.State: RUNNABLE
at
org.apache.brooklyn.core.sensor.DependentConfiguration$WaitInTaskForAttributeReady.call(DependentConfiguration.java:252)
at
org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:519)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
task=Task[formatting '%d' with 1 task]@lNzNRuj1;
thread=Thread[brooklyn-execmanager-S0LtDxD8-11,5,main]
"brooklyn-execmanager-S0LtDxD8-11" daemon prio=5 tid=0x00007facf3a8b800
nid=0x6c07 waiting on condition [0x0000700001ded000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007f64e81c0> (a
java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:425)
at java.util.concurrent.FutureTask.get(FutureTask.java:187)
at
com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:63)
at org.apache.brooklyn.util.core.task.BasicTask.get(BasicTask.java:361)
at
org.apache.brooklyn.core.sensor.DependentConfiguration$1.call(DependentConfiguration.java:409)
at
org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:519)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
task=Task[formatting '%s' with 1 task]@ExQmzZBq;
thread=Thread[brooklyn-execmanager-S0LtDxD8-10,5,main]
"brooklyn-execmanager-S0LtDxD8-10" daemon prio=5 tid=0x00007facf2898000
nid=0x6507 waiting on condition [0x0000700001cea000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007f63a0cb8> (a
java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:425)
at java.util.concurrent.FutureTask.get(FutureTask.java:187)
at
com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:63)
at org.apache.brooklyn.util.core.task.BasicTask.get(BasicTask.java:361)
at
org.apache.brooklyn.core.sensor.DependentConfiguration$1.call(DependentConfiguration.java:409)
at
org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:519)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
task=Task[Resolving dependent value]@wQxaXZ77;
thread=Thread[brooklyn-execmanager-S0LtDxD8-7,5,main]
"brooklyn-execmanager-S0LtDxD8-7" daemon prio=5 tid=0x00007facf13d6000
nid=0x5a07 waiting on condition [0x00007000018de000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007f60309b0> (a
java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:425)
at java.util.concurrent.FutureTask.get(FutureTask.java:187)
at
com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:63)
at org.apache.brooklyn.util.core.task.BasicTask.get(BasicTask.java:361)
at
org.apache.brooklyn.camp.brooklyn.spi.dsl.BrooklynDslDeferredSupplier.get(BrooklynDslDeferredSupplier.java:129)
at
org.apache.brooklyn.util.core.task.ValueResolver$2.call(ValueResolver.java:334)
at
org.apache.brooklyn.util.core.task.DynamicSequentialTask$DstJob.call(DynamicSequentialTask.java:359)
at
org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:519)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
task=Task[LSM.publishInitialValue(Application[9ftxjzc5], Sensor: port
(java.lang.Integer));[subscription-delivery-entity-pf9ftxjzc5[Application[9ftxjzc5]]]]@C1UYl8il;
thread=Thread[brooklyn-execmanager-S0LtDxD8-1,5,main]
"brooklyn-execmanager-S0LtDxD8-1" daemon prio=5 tid=0x00007facf136f800
nid=0x6203 waiting on condition [0x00007000019e0000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007f4ee2190> (a
java.util.concurrent.FutureTask)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:422)
at java.util.concurrent.FutureTask.get(FutureTask.java:199)
at
com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:69)
at org.apache.brooklyn.util.core.task.BasicTask.get(BasicTask.java:467)
at org.apache.brooklyn.util.core.task.BasicTask.get(BasicTask.java:434)
at org.apache.brooklyn.util.time.Durations.get(Durations.java:57)
at org.apache.brooklyn.util.time.Durations.get(Durations.java:70)
at
org.apache.brooklyn.util.core.task.ValueResolver.getMaybeInternal(ValueResolver.java:353)
at
org.apache.brooklyn.util.core.task.ValueResolver.getMaybe(ValueResolver.java:257)
at
org.apache.brooklyn.enricher.stock.Transformer$4.apply(Transformer.java:95)
at
org.apache.brooklyn.enricher.stock.Transformer$4.apply(Transformer.java:1)
at
org.apache.brooklyn.enricher.stock.AbstractTransformer.compute(AbstractTransformer.java:153)
at
org.apache.brooklyn.enricher.stock.AbstractTransformer.onEvent(AbstractTransformer.java:147)
at
org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager$1.run(LocalSubscriptionManager.java:149)
at
org.apache.brooklyn.util.concurrent.CallableFromRunnable.call(CallableFromRunnable.java:43)
at
org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:519)
at
org.apache.brooklyn.util.core.task.SingleThreadedScheduler$1.call(SingleThreadedScheduler.java:116)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{noformat}
> The sensor Transformer enricher fails to resolve its targetValue
> indeterministically.
> -------------------------------------------------------------------------------------
>
> Key: BROOKLYN-356
> URL: https://issues.apache.org/jira/browse/BROOKLYN-356
> Project: Brooklyn
> Issue Type: Bug
> Reporter: Svetoslav Neykov
>
> The sensor {{Transformer}} enricher fails to resolve its {{targetValue}}
> indeterministically, especially so on loaded systems. This is especially
> problematic if sourceSensor/triggerSensors change infrequently or do not
> change ever (when using default values).
> Bueprints using the {{Transformer}} behave perfectly fine during development
> and on test setups, but will fail on production machines (i.e. under load).
> The code [doing the value
> resolving|https://github.com/apache/brooklyn-server/blob/b59e7463a9b337c2d0e7931cd420d5bac68d8549/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java#L90-L94]
> tries to do so in a non-blocking fashion by spinning a thread to try to
> resolve and cancelling it after a short while without guarantees it ever got
> scheduled to run. It's more likely to fail when nesting DSLs, for example
> nesting several levels of {{$brooklyn:formatString}} and ending with a
> {{$brooklyn:attributeWhenRready}}. It's a common pattern in moderately
> complex blueprints. It needs to schedule a thread for each nesting level thus
> maximizing the chance that the value will not be resolved in the allotted
> time even if resolvabe. This is especially bad for sensors which don't get
> updated, for example {{PortAttributeSensorAndConfigKey} set only when
> initializing the entity or any config values which are always resolvable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)