> On June 15, 2015, 6:09 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1199
> > <https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line1199>
> >
> >     Doesn't this cause poll() to block for the backoff time?

Generally it wouldn't, but there are a couple cases where it could. For 
example, we could be waiting to discover the coordinator, or we might need to 
fetch some partition offsets. In either case, we could block longer than the 
poll timeout. Note that this is consistent with the old version. I had hoped to 
address it as part of solving KAFKA-1894.


> On June 15, 2015, 6:09 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 928
> > <https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line928>
> >
> >     Could you explain a bit more how wakeup() can be used for commit 
> > offsets or seek to a new position? Does a separate thread have to first 
> > save some partitions for committing offsets in the poll thread and then 
> > wake it up? Then the poll thread will initiate the offset commit before the 
> > next poll request?

It's admittedly a little awkward, but you could do it the way you suggest. For 
example, you could use a flag and check it in the poll thread:

```
while (true) {
  if (needCommit.get()) {
    consumer.commit(offsets)
    needCommit.set(false)
  }
  
  records = consumer.poll(50000)
  // submit records to executor perhaps
}
```

Then the other thread just needs to set the flag and offsets that need 
committing and wakeup the consumer.

Alternatively, you could protect the consumer with a lock and commit the 
offsets from a separate thread. The difficulty with this solution is that you 
need to make sure that the polling thread does not simply reacquire the lock 
after being woken up. This problem can be dealt with by introducing a second 
lock or perhaps by getting tricky with a semaphore, but I'd probably recommend 
the previous solution instead since it's easier to understand.


> On June 15, 2015, 6:09 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 686-687
> > <https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line686>
> >
> >     It's not very clear to me if the caller is woken up from a poll() call, 
> > whether the next poll() call will resume from the exact same state the 
> > previous one is left off.
> >     
> >     For example, let's say the first pool() call just issued a 
> > consumerMetadataRequest in ensureCoordinatorKnown() and then is woken up. 
> > In the next poll() call, do we just wait for the response from the previous 
> > consumerMetadataRequest or will it trigger a new consumerMetadataRequest? 
> > If it's the latter, will the two responses be processed out of order?

Waking up the consumer can leave requests pending. This could also happen if we 
change the code to actually respect the poll timeout (and I think this is the 
main reason why it hasn't been done). I was thinking about wakeup() as 
primarily a way to get timely shutdown behavior, but if it is also used for 
commits or seeks, then we have to solve this problem. I have thought about 
adding a cancel() operation to the RequestFuture. It wouldn't stop an in-flight 
request, but you could prevent its callback from being executed. This would 
probably be good enough to deal with a pending consumerMetadataRequest, but we 
wouldn't want to leave a JoinGroup request pending since resending it may cause 
unnecessary rebalancing. Perhaps we shouldn't allow wakeups in those cases? 
Maybe the only case we actually wakeup from is when we are polling for data?


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87880
-----------------------------------------------------------


On June 11, 2015, 9:10 p.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> -----------------------------------------------------------
> 
> (Updated June 11, 2015, 9:10 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
>     https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> KAFKA-2168; address review comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to