A JIRA has been created for adding this thread affinity check https://issues.apache.org/jira/browse/APEXCORE-510 . I have made this enhancement in a branch https://github.com/sanjaypujare/apex-core/tree/malhar-510.thread_affinity and I have been benchmarking the performance with this change. I will be publishing the results in the above JIRA where we can discuss them and hopefully agree on merging this change.
On Thu, Aug 11, 2016 at 1:41 PM, Sanjay Pujare <san...@datatorrent.com> wrote: > You are right, I was subconsciously thinking about the THREAD_LOCAL case > with a single container and a simple DAG and in that case Vlad’s assumption > might not be valid but may be it is. > > On 8/11/16, 11:47 AM, "Munagala Ramanath" <r...@datatorrent.com> wrote: > > If I understand Vlad correctly, what he is saying is that each operator > saves currentThread in > its own setup() and checks it in its own output methods. The threads in > different operators are > running potentially on different nodes and/or processes and there will > be > no connection between them. > > Ram > > On Thu, Aug 11, 2016 at 11:41 AM, Sanjay Pujare < > san...@datatorrent.com> > wrote: > > > Name check is expensive, agreed, but there isn’t anything else > currently. > > Ideally the stram engine (considering that it is an engine providing > > resources like threads etc) should use a ThreadFactory or a > ThreadGroup to > > create operator threads so identification and adding functionality is > > easier. > > > > The idea of checking for the same thread between setup() and emit() > won’t > > work because the emit() check will have to be in the Sink hierarchy > and > > AFAIK a Sink object doesn’t have access to the corresponding > operator, > > right? Another more fundamental problem probably is that these > threads > > don’t have to match. The emit() for any operator (or rather a Sink > related > > to an operator) is ultimately triggered by an emitTuple() on the > topmost > > input operator in that path which happens in that input operator’s > thread > > which doesn’t have to match the thread calling setup() in the > downstream > > operators, right? > > > > > > On 8/11/16, 10:59 AM, "Vlad Rozov" <v.ro...@datatorrent.com> wrote: > > > > Name verification is too expensive, it will be sufficient to > store > > currentThread during setup() and verify that it is the same > during > > emit. > > Checks should be supported not only for DefaultOutputPort, so we > may > > have it implemented in various Sinks. > > > > Vlad > > > > On 8/11/16 10:21, Sanjay Pujare wrote: > > > Thinking more about this – all of the “operator” threads are > created > > by the Stram engine with appropriate names. So we can put checks in > the > > DefaultOutputPort.emit() or in the various implementations of > Sink.put() > > that the current-thread is one created by the Stram engine (by > verifying > > the name). > > > > > > We can even use a special Thread object for operator threads > so the > > above detection is easier. > > > > > > > > > > > > On 8/10/16, 6:11 PM, "Amol Kekre" <a...@datatorrent.com> > wrote: > > > > > > +1 on debug proposal. Even if tuples lands up within the > > window, it breaks > > > all guarantees. A rerun (after restart from a checkpoint) > can > > have tuples > > > in different windows from this thread. A separate thread > simply > > exposes > > > users to unwarranted risks. > > > > > > Thks > > > Amol > > > > > > > > > On Wed, Aug 10, 2016 at 6:05 PM, Vlad Rozov < > > v.ro...@datatorrent.com> wrote: > > > > > > > Tuples emitted between end and begin windows is only > one of > > possible > > > > behaviors that emitting tuples on a separate from the > > operator thread may > > > > introduce. It will be good to have both checks in place > at > > run-time and if > > > > checking for the operator thread for every emitted > tuple is > > too expensive, > > > > we may have it enabled only in DEBUG or mode with more > checks > > in place. > > > > > > > > Vlad > > > > > > > > > > > > Sanjay just reminded me of my typo -> I meant between > > end_window and > > > >> start_window :) > > > >> > > > >> Thks > > > >> Amol > > > >> > > > >> On Wed, Aug 10, 2016 at 2:36 PM, Sanjay Pujare < > > san...@datatorrent.com> > > > >> wrote: > > > >> > > > >> If the goal is to do this validation through static > analysis > > of operator > > > >>> code, I guess it is possible but is going to be > > non-trivial. And there > > > >>> could be false positives and false negatives. > > > >>> > > > >>> Also I suppose this discussion applies to processor > > operators (those > > > >>> having both in and out ports) so Ram’s example of > > JdbcPollInputOperator > > > >>> may > > > >>> not be applicable here? > > > >>> > > > >>> On 8/10/16, 2:04 PM, "Ashwin Chandra Putta" < > > ashwinchand...@gmail.com> > > > >>> wrote: > > > >>> > > > >>> In a separate thread I mean. > > > >>> > > > >>> Regards, > > > >>> Ashwin. > > > >>> > > > >>> On Wed, Aug 10, 2016 at 2:01 PM, Ashwin Chandra > Putta < > > > >>> ashwinchand...@gmail.com> wrote: > > > >>> > > > >>> > + dev@apex.apache.org > > > >>> > - us...@apex.apache.org > > > >>> > > > > >>> > This is one of those best practices that we > learn by > > experience > > > >>> during > > > >>> > operator development. It will save a lot of > time > > during operator > > > >>> > development if we can catch and throw > validation > > error when > > > >>> someone > > > >>> emits > > > >>> > tuples in a non separate thread. > > > >>> > > > > >>> > Regards, > > > >>> > Ashwin > > > >>> > > > > >>> > On Wed, Aug 10, 2016 at 1:57 PM, Munagala > Ramanath < > > > >>> r...@datatorrent.com> > > > >>> > wrote: > > > >>> > > > > >>> >> For cases where use of a different thread is > > needed, it can write > > > >>> tuples > > > >>> >> to a queue from where the operator thread > pulls > > them -- > > > >>> >> JdbcPollInputOperator in Malhar has an > example. > > > >>> >> > > > >>> >> Ram > > > >>> >> > > > >>> >> On Wed, Aug 10, 2016 at 1:50 PM, > hsy...@gmail.com < > > > >>> hsy...@gmail.com > > > >>> >> wrote: > > > >>> >> > > > >>> >>> Hey Vlad, > > > >>> >>> > > > >>> >>> Thanks for bringing this up. Is there an > easy way > > to detect > > > >>> unexpected > > > >>> >>> use of emit method without hurt the > performance. > > Or at least if > > > >>> we > > > >>> can > > > >>> >>> detect this in debug mode. > > > >>> >>> > > > >>> >>> Regards, > > > >>> >>> Siyuan > > > >>> >>> > > > >>> >>> On Wed, Aug 10, 2016 at 11:27 AM, Vlad Rozov > < > > > >>> v.ro...@datatorrent.com> > > > >>> >>> wrote: > > > >>> >>> > > > >>> >>>> The short answer is no, creating worker > thread to > > emit tuples > > > >>> is > > > >>> not > > > >>> >>>> supported by Apex and will lead to an > undefined > > behavior. > > > >>> Operators in Apex > > > >>> >>>> have strong thread affinity and all > interaction > > with the > > > >>> platform > > > >>> must > > > >>> >>>> happen on the operator thread. > > > >>> >>>> > > > >>> >>>> Vlad > > > >>> >>>> > > > >>> >>> > > > >>> >>> > > > >>> >> > > > >>> > > > > >>> > > > > >>> > -- > > > >>> > > > > >>> > Regards, > > > >>> > Ashwin. > > > >>> > > > > >>> > > > >>> > > > >>> > > > >>> -- > > > >>> > > > >>> Regards, > > > >>> Ashwin. > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > >