Github user askprasanna commented on a diff in the pull request:
https://github.com/apache/storm/pull/2156#discussion_r121465722
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
---
@@ -64,6 +65,27 @@ public void addToAckMsgs(KafkaSpoutMessageId msgId) {
// O(Log N)
public void addToEmitMsgs(long offset) {
this.emittedOffsets.add(offset); // O(Log N)
}
+
+ public int getNumUncommittedOffsets() {
+ return this.emittedOffsets.size();
+ }
+
+ /**
+ * Gets the offset of the nth emitted message after the committed
offset.
+ * Example: If the committed offset is 0 and offsets 1, 2, 8, 10 have
been emitted,
+ * getNthUncommittedOffsetAfterCommittedOffset(3) returns 8.
+ *
+ * @param index The index of the message to get the offset for
+ * @return The offset
+ * @throws NoSuchElementException if the index is out of range
+ */
+ public long getNthUncommittedOffsetAfterCommittedOffset(int index) {
+ Iterator<Long> offsetIter = emittedOffsets.iterator();
+ for (int i = 0; i < index - 1; i++) {
+ offsetIter.next();
+ }
--- End diff --
Looks like an iterator is used under the covers to create the array (which
explains the ordering guarantee in the array). No change required.. :-)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---