jerrypeng commented on issue #2692: Making emit, ack, and fail thread safe URL: https://github.com/apache/incubator-heron/pull/2692#issuecomment-390423252 @bornej here is an example of a bolt that acks and emits from a different thread ``` public static class ExclamationBolt extends BaseRichBolt { private static final long serialVersionUID = -2267338658317778214L; private OutputCollector collector; private long nItems; private long startTime; private LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<>(); @Override @SuppressWarnings("rawtypes") public void prepare(Map conf, TopologyContext context, OutputCollector acollector) { collector = acollector; nItems = 0; startTime = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { private long pItems; @Override public void run() { System.out.println("start thread 1 ...."); while (true) { Tuple tuple = null; try { tuple = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } collector.ack(tuple); ++pItems; if (pItems % 10000 == 0) { long latency = System.currentTimeMillis() - startTime; System.out.println("Bolt - 1 processed " + pItems + " tuples in " + latency + " ms"); } } } }); thread1.setName("thread-1"); thread1.start(); } @Override public void execute(Tuple tuple) { try { queue.put(tuple); } catch (InterruptedException e) { e.printStackTrace(); } ++nItems; if (nItems % 10000 == 0) { long latency = System.currentTimeMillis() - startTime; System.out.println("Bolt - 0 processed " + nItems + " tuples in " + latency + " ms"); } } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
