jerrypeng commented on issue #2692: Making emit, ack, and fail thread safe
URL: https://github.com/apache/incubator-heron/pull/2692#issuecomment-390423252
 
 
   @bornej here is an example of a bolt that acks and emits from a different 
thread
   
   ```
   public static class ExclamationBolt extends BaseRichBolt {
       private static final long serialVersionUID = -2267338658317778214L;
       private OutputCollector collector;
       private long nItems;
       private long startTime;
       private LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
   
       @Override
       @SuppressWarnings("rawtypes")
       public void prepare(Map conf, TopologyContext context, OutputCollector 
acollector) {
         collector = acollector;
         nItems = 0;
         startTime = System.currentTimeMillis();
   
         Thread thread1 = new Thread(new Runnable() {
           private long pItems;
   
           @Override
           public void run() {
             System.out.println("start thread 1 ....");
             while (true) {
               Tuple tuple = null;
               try {
                 tuple = queue.take();
               } catch (InterruptedException e) {
                 e.printStackTrace();
               }
               collector.ack(tuple);
   
               ++pItems;
   
               if (pItems % 10000 == 0) {
                 long latency = System.currentTimeMillis() - startTime;
                 System.out.println("Bolt - 1 processed " + pItems + " tuples 
in " + latency + " ms");
               }
             }
           }
         });
   
         thread1.setName("thread-1");
         thread1.start();
       }
   
       @Override
       public void execute(Tuple tuple) {
         try {
           queue.put(tuple);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
   
         ++nItems;
         if (nItems % 10000 == 0) {
           long latency = System.currentTimeMillis() - startTime;
           System.out.println("Bolt - 0 processed " + nItems + " tuples in " + 
latency + " ms");
         }
       }
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to