Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147025782 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() { // ======== Ack ======= @Override public void ack(Object messageId) { - if (!isAtLeastOnce()) { - // Only need to keep track of acked tuples if commits are done based on acks - return; - } - + // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; - if (!emitted.contains(msgId)) { - if (msgId.isEmitted()) { - LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " - + "came from a topic-partition that this consumer group instance is no longer tracking " - + "due to rebalance/partition reassignment. No action taken.", msgId); + if (isAtLeastOnceProcessing()) { --- End diff -- Btw I don't think it is a big deal to decide and stick with specific approach, cause there's no statement after if statement. @srdo Could we leave the decision of this review comment to @hmcl?
---