[ 
https://issues.apache.org/jira/browse/BROOKLYN-356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15569708#comment-15569708
 ] 

Aled Sage commented on BROOKLYN-356:
------------------------------------

I talked this over with [~svet]. We've done a very short-term improvement of 
increasing the timeout. I'm looking at a longer term fix where we don't execute 
so many tasks in different threads to get the value.

As Svet described, the {{Transformer}} executes a task to get the value (with a 
timeout, because the value might not be available yet). It is trying to do a 
non-blocking lookup of the value, but using the task api that requires the work 
be executed in another thread. It therefore uses a timeout to terminate the 
lookup if it hasn't returned promptly, but that obviously gives a race 
condition.

A better solution for such a non-blocking lookup is to ask for the value 
immediately, and to get back a {{Maybe}} (similar to guava's {{Optional}}, but 
accepts null values). Each DSL (formatString, attributeWhenReady, etc) is of 
type {{DeferredSupplier}}. I'm adding another interface ({{ImmediateSupplier}} 
with a method {{Maybe<T> getImmediately()}}) that we'll use to get the value 
immediately, if we don't want to wait for it. However, it is a bit fiddly (e.g. 
the DSL execution doesn't know the context entity, so I'm making some (small) 
changes to the subscription context as well).

Even longer term, we could look at reversing the responsibilities: if the value 
is an {{attributeWhenReady}} then that could do the subscription itself, and 
call back to the transformer when the value becomes available (or changes). 
Also, by using callbacks more, we can change some of the places that we do 
blocking calls (so we improve our thread usage).

Below is the current sequence of task being executed (and the thread's 
stacktrace for each), when executing {{TransformerEnricherWithDslTest}} and the 
transformer retrieving the value:

{noformat}
task=Task[waiting on port]@J5UlSQ04; 
thread=Thread[brooklyn-execmanager-S0LtDxD8-9,5,main]; 
submitTime=1476272773046; startTime=1476272773046; queuedTime=-1
task=Task[formatting '%d' with 1 task]@lNzNRuj1; 
thread=Thread[brooklyn-execmanager-S0LtDxD8-11,5,main]; 
submitTime=1476272773045; startTime=1476272773045; queuedTime=-1
task=Task[formatting '%s' with 1 task]@ExQmzZBq; 
thread=Thread[brooklyn-execmanager-S0LtDxD8-10,5,main]; 
submitTime=1476272773045; startTime=1476272773045; queuedTime=-1
task=Task[Resolving dependent value]@wQxaXZ77; 
thread=Thread[brooklyn-execmanager-S0LtDxD8-7,5,main]; 
submitTime=1476272773024; startTime=1476272773025; queuedTime=-1
task=Task[LSM.publishInitialValue(Application[9ftxjzc5], Sensor: port 
(java.lang.Integer));[subscription-delivery-entity-pf9ftxjzc5[Application[9ftxjzc5]]]]@C1UYl8il;
 thread=Thread[brooklyn-execmanager-S0LtDxD8-1,5,main]; 
submitTime=1476272738192; startTime=1476272738192; queuedTime=-1

task=Task[waiting on port]@J5UlSQ04; 
thread=Thread[brooklyn-execmanager-S0LtDxD8-9,5,main]
"brooklyn-execmanager-S0LtDxD8-9" daemon prio=5 tid=0x00007facf22ab800 
nid=0x660b at breakpoint[0x0000700001be7000]
   java.lang.Thread.State: RUNNABLE
        at 
org.apache.brooklyn.core.sensor.DependentConfiguration$WaitInTaskForAttributeReady.call(DependentConfiguration.java:252)
        at 
org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:519)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

task=Task[formatting '%d' with 1 task]@lNzNRuj1; 
thread=Thread[brooklyn-execmanager-S0LtDxD8-11,5,main]
"brooklyn-execmanager-S0LtDxD8-11" daemon prio=5 tid=0x00007facf3a8b800 
nid=0x6c07 waiting on condition [0x0000700001ded000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007f64e81c0> (a 
java.util.concurrent.FutureTask)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:425)
        at java.util.concurrent.FutureTask.get(FutureTask.java:187)
        at 
com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:63)
        at org.apache.brooklyn.util.core.task.BasicTask.get(BasicTask.java:361)
        at 
org.apache.brooklyn.core.sensor.DependentConfiguration$1.call(DependentConfiguration.java:409)
        at 
org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:519)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

task=Task[formatting '%s' with 1 task]@ExQmzZBq; 
thread=Thread[brooklyn-execmanager-S0LtDxD8-10,5,main]
"brooklyn-execmanager-S0LtDxD8-10" daemon prio=5 tid=0x00007facf2898000 
nid=0x6507 waiting on condition [0x0000700001cea000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007f63a0cb8> (a 
java.util.concurrent.FutureTask)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:425)
        at java.util.concurrent.FutureTask.get(FutureTask.java:187)
        at 
com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:63)
        at org.apache.brooklyn.util.core.task.BasicTask.get(BasicTask.java:361)
        at 
org.apache.brooklyn.core.sensor.DependentConfiguration$1.call(DependentConfiguration.java:409)
        at 
org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:519)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

task=Task[Resolving dependent value]@wQxaXZ77; 
thread=Thread[brooklyn-execmanager-S0LtDxD8-7,5,main]
"brooklyn-execmanager-S0LtDxD8-7" daemon prio=5 tid=0x00007facf13d6000 
nid=0x5a07 waiting on condition [0x00007000018de000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007f60309b0> (a 
java.util.concurrent.FutureTask)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:425)
        at java.util.concurrent.FutureTask.get(FutureTask.java:187)
        at 
com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:63)
        at org.apache.brooklyn.util.core.task.BasicTask.get(BasicTask.java:361)
        at 
org.apache.brooklyn.camp.brooklyn.spi.dsl.BrooklynDslDeferredSupplier.get(BrooklynDslDeferredSupplier.java:129)
        at 
org.apache.brooklyn.util.core.task.ValueResolver$2.call(ValueResolver.java:334)
        at 
org.apache.brooklyn.util.core.task.DynamicSequentialTask$DstJob.call(DynamicSequentialTask.java:359)
        at 
org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:519)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

task=Task[LSM.publishInitialValue(Application[9ftxjzc5], Sensor: port 
(java.lang.Integer));[subscription-delivery-entity-pf9ftxjzc5[Application[9ftxjzc5]]]]@C1UYl8il;
 thread=Thread[brooklyn-execmanager-S0LtDxD8-1,5,main]
"brooklyn-execmanager-S0LtDxD8-1" daemon prio=5 tid=0x00007facf136f800 
nid=0x6203 waiting on condition [0x00007000019e0000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007f4ee2190> (a 
java.util.concurrent.FutureTask)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:422)
        at java.util.concurrent.FutureTask.get(FutureTask.java:199)
        at 
com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:69)
        at org.apache.brooklyn.util.core.task.BasicTask.get(BasicTask.java:467)
        at org.apache.brooklyn.util.core.task.BasicTask.get(BasicTask.java:434)
        at org.apache.brooklyn.util.time.Durations.get(Durations.java:57)
        at org.apache.brooklyn.util.time.Durations.get(Durations.java:70)
        at 
org.apache.brooklyn.util.core.task.ValueResolver.getMaybeInternal(ValueResolver.java:353)
        at 
org.apache.brooklyn.util.core.task.ValueResolver.getMaybe(ValueResolver.java:257)
        at 
org.apache.brooklyn.enricher.stock.Transformer$4.apply(Transformer.java:95)
        at 
org.apache.brooklyn.enricher.stock.Transformer$4.apply(Transformer.java:1)
        at 
org.apache.brooklyn.enricher.stock.AbstractTransformer.compute(AbstractTransformer.java:153)
        at 
org.apache.brooklyn.enricher.stock.AbstractTransformer.onEvent(AbstractTransformer.java:147)
        at 
org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManager$1.run(LocalSubscriptionManager.java:149)
        at 
org.apache.brooklyn.util.concurrent.CallableFromRunnable.call(CallableFromRunnable.java:43)
        at 
org.apache.brooklyn.util.core.task.BasicExecutionManager$SubmissionCallable.call(BasicExecutionManager.java:519)
        at 
org.apache.brooklyn.util.core.task.SingleThreadedScheduler$1.call(SingleThreadedScheduler.java:116)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{noformat}


> The sensor Transformer enricher fails to resolve its targetValue 
> indeterministically.
> -------------------------------------------------------------------------------------
>
>                 Key: BROOKLYN-356
>                 URL: https://issues.apache.org/jira/browse/BROOKLYN-356
>             Project: Brooklyn
>          Issue Type: Bug
>            Reporter: Svetoslav Neykov
>
> The sensor {{Transformer}} enricher fails to resolve its {{targetValue}} 
> indeterministically, especially so on loaded systems. This is especially 
> problematic if sourceSensor/triggerSensors change infrequently or do not 
> change ever (when using default values).
> Bueprints using the {{Transformer}} behave perfectly fine during development 
> and on test setups, but will fail on production machines (i.e. under load).
> The code [doing the value 
> resolving|https://github.com/apache/brooklyn-server/blob/b59e7463a9b337c2d0e7931cd420d5bac68d8549/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java#L90-L94]
>  tries to do so in a non-blocking fashion by spinning a thread to try to 
> resolve and cancelling it after a short while without guarantees it ever got 
> scheduled to run. It's more likely to fail when nesting DSLs, for example 
> nesting several levels of {{$brooklyn:formatString}} and ending with a 
> {{$brooklyn:attributeWhenRready}}. It's a common pattern in moderately 
> complex blueprints. It needs to schedule a thread for each nesting level thus 
> maximizing the chance that the value will not be resolved in the allotted 
> time even if resolvabe. This is especially bad for sensors which don't get 
> updated, for example {{PortAttributeSensorAndConfigKey} set only when 
> initializing the entity or any config values which are always resolvable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to