Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157346228
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -225,6 +237,23 @@ private long doSeek(TopicPartition tp,
OffsetAndMetadata committedOffset) {
}
}
+ /**
+ * Checks If {@link OffsetAndMetadata} was committed by this topology,
either by this or another spout instance.
+ * This info is used to decide if {@link FirstPollOffsetStrategy}
should be applied
+ *
+ * @param committedOffset {@link OffsetAndMetadata} info committed to
Kafka
+ * @return true if this topology committed this {@link
OffsetAndMetadata}, false otherwise
+ */
+ private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata
committedOffset) {
+ try {
+ return committedOffset != null &&
JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpoutMessageId.class)
--- End diff --
Nit: Move the null check outside this method, it's already checked before
using this function at one call site, and in the other I don't think it makes
the code more readable that it's moved in here. Also consider putting the
message id in a variable before doing the .getTopologyId, that way it's easier
to tell what's happening in case we get an NPE from this method.
---