Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146329456
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done 
based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are 
done after a tuple ack is received
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    -                LOG.debug("Received ack for message [{}], associated with 
tuple emitted for a ConsumerRecord that "
    -                    + "came from a topic-partition that this consumer 
group instance is no longer tracking "
    -                    + "due to rebalance/partition reassignment. No action 
taken.", msgId);
    +        if (isAtLeastOnceProcessing()) {
    +            if (!emitted.contains(msgId)) {
    +                if (msgId.isEmitted()) {
    +                    LOG.debug("Received ack for message [{}], associated 
with tuple emitted for a ConsumerRecord that "
    +                        + "came from a topic-partition that this consumer 
group instance is no longer tracking "
    +                        + "due to rebalance/partition reassignment. No 
action taken.", msgId);
    +                } else {
    +                    LOG.debug("Received direct ack for message [{}], 
associated with null tuple", msgId);
    +                }
                 } else {
    -                LOG.debug("Received direct ack for message [{}], 
associated with null tuple", msgId);
    +                Validate.isTrue(!retryService.isScheduled(msgId), "The 
message id " + msgId + " is queued for retry while being acked."
    +                    + " This should never occur barring errors in the 
RetryService implementation or the spout code.");
    +                
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
    +                emitted.remove(msgId);
                 }
    -        } else {
    -            Validate.isTrue(!retryService.isScheduled(msgId), "The message 
id " + msgId + " is queued for retry while being acked."
    -                + " This should never occur barring errors in the 
RetryService implementation or the spout code.");
    -            
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
    -            emitted.remove(msgId);
    +            tupleListener.onAck(msgId);
             }
    -        tupleListener.onAck(msgId);
         }
     
         // ======== Fail =======
         @Override
         public void fail(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of failed tuples if commits are 
done based on acks
    -            return;
    -        }
    +        // Only need to keep track of failed tuples if commits to Kafka 
are done after a tuple ack is received
    +        if (isAtLeastOnceProcessing()) {
    --- End diff --
    
    Same as for ack


---

Reply via email to