Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129736861 --- Diff: storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -43,43 +45,59 @@ private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class); private final BoltExecutor executor; - private final Task taskData; + private final Task task; private final int taskId; private final Random random; private final boolean isEventLoggers; + private final ExecutorTransfer xsfer; + private boolean ackingEnabled; private final boolean isDebug; - public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random, - boolean isEventLoggers, boolean isDebug) { + public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, Random random, + boolean isEventLoggers, boolean ackingEnabled, boolean isDebug) { this.executor = executor; - this.taskData = taskData; - this.taskId = taskId; + this.task = taskData; + this.taskId = taskData.getTaskId(); this.random = random; this.isEventLoggers = isEventLoggers; + this.ackingEnabled = ackingEnabled; this.isDebug = isDebug; + this.xsfer = executor.getExecutorTransfer(); } public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { - return boltEmit(streamId, anchors, tuple, null); + try { + return boltEmit(streamId, anchors, tuple, null); + } catch (InterruptedException e) { + LOG.warn("Thread interrupted when emiting tuple."); + throw new RuntimeException(e); + } } @Override public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { - boltEmit(streamId, anchors, tuple, taskId); + try { + boltEmit(streamId, anchors, tuple, taskId); + } catch (InterruptedException e) { + LOG.warn("Thread interrupted when emiting tuple."); + throw new RuntimeException(e); + } } - private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) { + private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) throws InterruptedException { List<Integer> outTasks; if (targetTaskId != null) { - outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values); + outTasks = task.getOutgoingTasks(targetTaskId, streamId, values); } else { - outTasks = taskData.getOutgoingTasks(streamId, values); + outTasks = task.getOutgoingTasks(streamId, values); } - for (Integer t : outTasks) { - Map<Long, Long> anchorsToIds = new HashMap<>(); - if (anchors != null) { - for (Tuple a : anchors) { + for (int i=0; i<outTasks.size(); ++i) { + Integer t = outTasks.get(i); + MessageId msgId; + if (ackingEnabled && anchors != null) { + final Map<Long, Long> anchorsToIds = new HashMap<>(); + for (Tuple a : anchors) { //TODO: PERF: critical path. should avoid using iterators here and below --- End diff -- This is hard to fix due to signature of the public emit() method. Deferring it out of this PR. Left a note for future.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---