Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r129736861
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
 ---
    @@ -43,43 +45,59 @@
         private static final Logger LOG = 
LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
     
         private final BoltExecutor executor;
    -    private final Task taskData;
    +    private final Task task;
         private final int taskId;
         private final Random random;
         private final boolean isEventLoggers;
    +    private final ExecutorTransfer xsfer;
    +    private boolean ackingEnabled;
         private final boolean isDebug;
     
    -    public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, 
int taskId, Random random,
    -                                   boolean isEventLoggers, boolean 
isDebug) {
    +    public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData,  
Random random,
    +                                   boolean isEventLoggers, boolean 
ackingEnabled, boolean isDebug) {
             this.executor = executor;
    -        this.taskData = taskData;
    -        this.taskId = taskId;
    +        this.task = taskData;
    +        this.taskId = taskData.getTaskId();
             this.random = random;
             this.isEventLoggers = isEventLoggers;
    +        this.ackingEnabled = ackingEnabled;
             this.isDebug = isDebug;
    +        this.xsfer = executor.getExecutorTransfer();
         }
     
         public List<Integer> emit(String streamId, Collection<Tuple> anchors, 
List<Object> tuple) {
    -        return boltEmit(streamId, anchors, tuple, null);
    +        try {
    +            return boltEmit(streamId, anchors, tuple, null);
    +        } catch (InterruptedException e) {
    +            LOG.warn("Thread interrupted when emiting tuple.");
    +            throw new RuntimeException(e);
    +        }
         }
     
         @Override
         public void emitDirect(int taskId, String streamId, Collection<Tuple> 
anchors, List<Object> tuple) {
    -        boltEmit(streamId, anchors, tuple, taskId);
    +        try {
    +            boltEmit(streamId, anchors, tuple, taskId);
    +        } catch (InterruptedException e) {
    +            LOG.warn("Thread interrupted when emiting tuple.");
    +            throw new RuntimeException(e);
    +        }
         }
     
    -    private List<Integer> boltEmit(String streamId, Collection<Tuple> 
anchors, List<Object> values, Integer targetTaskId) {
    +    private List<Integer> boltEmit(String streamId, Collection<Tuple> 
anchors, List<Object> values, Integer targetTaskId) throws InterruptedException 
{
             List<Integer> outTasks;
             if (targetTaskId != null) {
    -            outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, 
values);
    +            outTasks = task.getOutgoingTasks(targetTaskId, streamId, 
values);
             } else {
    -            outTasks = taskData.getOutgoingTasks(streamId, values);
    +            outTasks = task.getOutgoingTasks(streamId, values);
             }
     
    -        for (Integer t : outTasks) {
    -            Map<Long, Long> anchorsToIds = new HashMap<>();
    -            if (anchors != null) {
    -                for (Tuple a : anchors) {
    +        for (int i=0; i<outTasks.size(); ++i) {
    +            Integer t = outTasks.get(i);
    +            MessageId msgId;
    +            if (ackingEnabled && anchors != null) {
    +                final Map<Long, Long> anchorsToIds = new HashMap<>();
    +                for (Tuple a : anchors) {  //TODO: PERF: critical path. 
should avoid using iterators here and below
    --- End diff --
    
    This is hard to fix due to signature of the public emit() method. Deferring 
it out of this PR. Left a note for future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to