Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147013717
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -255,26 +255,25 @@ private void throwKafkaConsumerInterruptedException() 
{
         }
     
         private boolean commit() {
    -        return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue();    
// timer != null for non auto commit mode
    +        return isAtLeastOnceProcessing() && 
commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
         }
     
         private boolean poll() {
             final int maxUncommittedOffsets = 
kafkaSpoutConfig.getMaxUncommittedOffsets();
             final int readyMessageCount = retryService.readyMessageCount();
             final boolean poll = !waitingToEmit()
    -            //Check that the number of uncommitted, nonretriable tuples is 
less than the maxUncommittedOffsets limit
    -            //Accounting for retriable tuples this way still guarantees 
that the limit is followed on a per partition basis,
    -            //and prevents locking up the spout when there are too many 
retriable tuples
    -            && (numUncommittedOffsets - readyMessageCount < 
maxUncommittedOffsets
    -            || !isAtLeastOnce());
    +                // Check that the number of uncommitted, non-retriable 
tuples is less than the maxUncommittedOffsets limit.
    --- End diff --
    
    Done.


---

Reply via email to