Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2453#discussion_r156519020
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -625,4 +620,21 @@ public String toString() {
         private String getTopicsString() {
             return kafkaSpoutConfig.getSubscription().getTopicsString();
         }
    +    
    +    private static class PollablePartitionInfo {
    +        private final Set<TopicPartition> pollablePartitions;
    +        //The subset of earliest retriable offsets that are on pollable 
partitions
    +        private final Map<TopicPartition, Long> 
pollableEarliestRetriableOffsets;
    +        
    +        public PollablePartitionInfo(Set<TopicPartition> 
pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) {
    +            this.pollablePartitions = pollablePartitions;
    +            this.pollableEarliestRetriableOffsets = 
earliestRetriableOffsets.entrySet().stream()
    +                .filter(entry -> 
pollablePartitions.contains(entry.getKey()))
    +                .collect(Collectors.toMap(entry -> entry.getKey(), entry 
-> entry.getValue()));
    +        }
    +        
    +        public boolean isPollAllowed() {
    --- End diff --
    
    Building on the comment above, I would still call it something along the 
lines `isReadyToPoll()` or `isPollableRetryReady`, because that's what it is, a 
retry that is ready to poll. However, it's up to you and I am +1 after this.


---

Reply via email to