Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158213916 --- Diff: storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -114,27 +136,29 @@ public void ack(Tuple input) { LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta); - boltAckInfo.applyOn(taskData.getUserContext()); - if (delta >= 0) { + boltAckInfo.applyOn(task.getUserContext()); + if (delta != 0) { ((BoltExecutorStats) executor.getStats()).boltAckedTuple( input.getSourceComponent(), input.getSourceStreamId(), delta); } } @Override public void fail(Tuple input) { + if(!ackingEnabled) + return; Set<Long> roots = input.getMessageId().getAnchors(); for (Long root : roots) { - executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID, + task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID, new Values(root), executor.getExecutorTransfer()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta); - boltFailInfo.applyOn(taskData.getUserContext()); - if (delta >= 0) { + boltFailInfo.applyOn(task.getUserContext()); + if (delta != 0) { --- End diff -- Maybe you addressed above only. It should stay the same. https://github.com/apache/storm/blob/b8f76afceeb33afbe904f529d410cd650fbd6824/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L831-L835
---