Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2465#discussion_r157349402
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -225,6 +237,23 @@ private long doSeek(TopicPartition tp, 
OffsetAndMetadata committedOffset) {
             }
         }
     
    +    /**
    +     * Checks If {@link OffsetAndMetadata} was committed by this topology, 
either by this or another spout instance.
    +     * This info is used to decide if {@link FirstPollOffsetStrategy} 
should be applied
    +     *
    +     * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
    +     * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
    +     */
    +    private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata 
committedOffset) {
    +        try {
    +            return committedOffset != null && 
JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpoutMessageId.class)
    --- End diff --
    
    I'd probably just return false from this method. Since the topology id is 
new for each deployment we know for certain that if there's no message id in 
the meta, then it wasn't committed by this topology.


---

Reply via email to