Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2367#discussion_r143863556 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java --- @@ -77,57 +78,58 @@ public void addToEmitMsgs(long offset) { */ public OffsetAndMetadata findNextCommitOffset() { long currOffset; - long nextEarliestUncommittedOffset = earliestUncommittedOffset; + long nextCommitOffset = committedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); - if (currOffset == nextEarliestUncommittedOffset) { // found the next offset to commit + if (currOffset == nextCommitOffset) { // found the next offset to commit nextCommitMsg = currAckedMsg; - nextEarliestUncommittedOffset = currOffset + 1; - } else if (currOffset > nextEarliestUncommittedOffset) { - if (emittedOffsets.contains(nextEarliestUncommittedOffset)) { - LOG.debug("topic-partition [{}] has non-contiguous offset [{}]." + nextCommitOffset = currOffset + 1; + } else if (currOffset > nextCommitOffset) { + if (emittedOffsets.contains(nextCommitOffset)) { + LOG.debug("topic-partition [{}] has non-sequential offset [{}]." + " It will be processed in a subsequent batch.", tp, currOffset); break; } else { /* - This case will arise in case of non contiguous offset being processed. - So, if the topic doesn't contain offset = committedOffset + 1 (possible + This case will arise in case of non-sequential offset being processed. + So, if the topic doesn't contain offset = nextCommitOffset (possible if the topic is compacted or deleted), the consumer should jump to the next logical point in the topic. Next logical offset should be the - first element after committedOffset in the ascending ordered emitted set. + first element after nextEarliestUncommittedOffset in the ascending ordered emitted set. --- End diff -- The field `nextEarliestUncommittedOffset` looks removed in the patch and this comment refers it.
---