Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2367#discussion_r143863556
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
---
@@ -77,57 +78,58 @@ public void addToEmitMsgs(long offset) {
*/
public OffsetAndMetadata findNextCommitOffset() {
long currOffset;
- long nextEarliestUncommittedOffset = earliestUncommittedOffset;
+ long nextCommitOffset = committedOffset;
KafkaSpoutMessageId nextCommitMsg = null; // this is a
convenience variable to make it faster to create OffsetAndMetadata
for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { //
complexity is that of a linear scan on a TreeMap
currOffset = currAckedMsg.offset();
- if (currOffset == nextEarliestUncommittedOffset) {
// found the next offset to commit
+ if (currOffset == nextCommitOffset) { // found the
next offset to commit
nextCommitMsg = currAckedMsg;
- nextEarliestUncommittedOffset = currOffset + 1;
- } else if (currOffset > nextEarliestUncommittedOffset) {
- if
(emittedOffsets.contains(nextEarliestUncommittedOffset)) {
- LOG.debug("topic-partition [{}] has non-contiguous
offset [{}]."
+ nextCommitOffset = currOffset + 1;
+ } else if (currOffset > nextCommitOffset) {
+ if (emittedOffsets.contains(nextCommitOffset)) {
+ LOG.debug("topic-partition [{}] has non-sequential
offset [{}]."
+ " It will be processed in a subsequent batch.",
tp, currOffset);
break;
} else {
/*
- This case will arise in case of non contiguous
offset being processed.
- So, if the topic doesn't contain offset =
committedOffset + 1 (possible
+ This case will arise in case of non-sequential
offset being processed.
+ So, if the topic doesn't contain offset =
nextCommitOffset (possible
if the topic is compacted or deleted), the
consumer should jump to
the next logical point in the topic. Next logical
offset should be the
- first element after committedOffset in the
ascending ordered emitted set.
+ first element after nextEarliestUncommittedOffset
in the ascending ordered emitted set.
--- End diff --
The field `nextEarliestUncommittedOffset` looks removed in the patch and
this comment refers it.
---