Ram,
Thanks for the prompt response. If we use the approach you suggested we're 
dependent on main thread's process call I.e. Tuples in the thread safe queue 
gets only processed when main thread is processing incoming tuples. How can we 
explicitly call the process from polling of delay queue ?

Just for reference here's the sample code snippet for our operator.


public class MyOperator extends BaseOperator implements

        Operator.ActivationListener<Context.OperatorContext> {

.....


@InputPortFieldAnnotation

    public transient DefaultInputPort<String> kafkaStreamInput =

            new DefaultInputPort<String>() {

        List<String> errors = new ArrayList<String>();

        @Override

        public void process(String consumerRecord) {

//Code for normal tuple process

//Code to poll thread safe queue

}

-------------------------------------
From: Munagala Ramanath <[email protected]<mailto:[email protected]>>
To: [email protected]<mailto:[email protected]>
CC: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>, Allan De Leon 
<[email protected]<mailto:[email protected]>>, Tim Zhu 
<[email protected]<mailto:[email protected]>>
Subject: Re: Occasional Out of order tuples when emitting from a thread
Date: 2017-02-21 10:08 (-0800)
List: 
[email protected]<https://lists.apache.org/[email protected]>

Please note that tuples should not be emitted by any thread other than the
main operator thread.

A common pattern is to use a thread-safe queue and have worker threads
enqueue
tuples there; the main operator thread then pulls tuples from the queue and
emits them.

Ram

_______________________________________________________

Munagala V. Ramanath

Software Engineer

E: [email protected]<mailto:[email protected]> | M: (408) 331-5034 | 
Twitter: @UnknownRam

www.datatorrent.com  |  apex.apache.org

From: Sunil Parmar <[email protected]<mailto:[email protected]>>
Date: Tuesday, February 21, 2017 at 10:05 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>, 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Cc: Allan De Leon <[email protected]<mailto:[email protected]>>, 
Tim Zhu <[email protected]<mailto:[email protected]>>
Subject: Occasional Out of order tuples when emitting from a thread

Hi there,
We have the following setup:

  *   we have a generic operator that's processing tuples in its input port
  *   in the input port's process method, we check for a condition, and:
     *   if the condition is met, the tuple is emitted to the next operator 
right away (in the process method)
     *   Otherwise, if the condition is not met, we store the tuple  in some 
cache and we use some threads that periodically check the condition to become 
true. Once the condition is true, the threads call the emit method on the 
stored tuples.

With this setup, we occasionally encounter the following error:
2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode: 
Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on port 
transformedJSON while expecting 58a4046100003b7e

Is there a way to make the above work correctly?
If not, can you recommend a better way of doing this?
How can we ensure window assignment is done synchronously before emitting 
tuples ?

Thanks very much in advance...
-allan

Reply via email to