Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r158213916
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
 ---
    @@ -114,27 +136,29 @@ public void ack(Tuple input) {
                 LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, 
delta, input);
             }
             BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
    -        boltAckInfo.applyOn(taskData.getUserContext());
    -        if (delta >= 0) {
    +        boltAckInfo.applyOn(task.getUserContext());
    +        if (delta != 0) {
                 ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
                         input.getSourceComponent(), input.getSourceStreamId(), 
delta);
             }
         }
     
         @Override
         public void fail(Tuple input) {
    +        if(!ackingEnabled)
    +            return;
             Set<Long> roots = input.getMessageId().getAnchors();
             for (Long root : roots) {
    -            executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
    +            task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
                         new Values(root), executor.getExecutorTransfer());
             }
             long delta = tupleTimeDelta((TupleImpl) input);
             if (isDebug) {
                 LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, 
delta, input);
             }
             BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
    -        boltFailInfo.applyOn(taskData.getUserContext());
    -        if (delta >= 0) {
    +        boltFailInfo.applyOn(task.getUserContext());
    +        if (delta != 0) {
    --- End diff --
    
    Maybe you addressed above only. It should stay the same.
    
    
https://github.com/apache/storm/blob/b8f76afceeb33afbe904f529d410cd650fbd6824/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L831-L835


---

Reply via email to