> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 797-798
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797>
> >
> >     Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
> > the end of the call, we expect the fetch offset to be set to the beginning. 
> > This is now changed to async, which doesn't match the intended behavior. We 
> > need to think through if this matters or not.
> >     
> >     Ditto for seekToEnd().
> 
> Jason Gustafson wrote:
>     Since we always update fetch positions before a new fetch and in 
> position(), it didn't seem necessary to make it synchronous. I thought this 
> handling might be more consistent with how new subscriptions are handled 
> (which are asynchronous and defer the initial offset fetch until the next 
> poll or position). That being said, I don't have a strong feeling about it, 
> so we could return to the blocking version.
> 
> Jun Rao wrote:
>     Making this async may be fine. One implication is that we call position() 
> immediately after seekToBeginning(), we may not be able to get the correct 
> offset.
> 
> Jason Gustafson wrote:
>     We should be able to get the right offset since we always update offsets 
> before returning the current position, but we might have to block for it. 
> It's similar to if you call subscribe(topic) and then try to get its position 
> immediately.
> 
> Jun Rao wrote:
>     That may work. However, if one calls seekToBegining() followed by 
> seekToEnd(), will we guarantee that position() returns the end offset?

Yes, this will work. The latest seek will overwrite any pending ones.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 319-322
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line319>
> >
> >     Could we add an example of how to use the new wakeup() call, especially 
> > with closing the consumer properly? For example, does the consumer thread 
> > just catch the ConsumerWakeupException and then call close()?

I've added an example in the latest patch.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 1039-1040
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1039>
> >
> >     The returned response may be ready already after the offsetBefore call 
> > due to needing metadata refresh. Since we don't check the ready state 
> > immediately afterward, we may be delaying the processing of metadata 
> > refresh by the request timeout.
> 
> Jason Gustafson wrote:
>     This is a pretty good point. One of the reasons working with 
> NetworkClient is tricky is that you need several polls to complete a request: 
> one to connect, one to send, and one to receive. In this case, the result 
> might not be ready because we are in the middle of connecting to the broker, 
> in which case we need to call poll() to finish the connect. If we don't, then 
> then next request will just fail for the same reason. I'll look to see if 
> there's a way to fix this to avoid unnecessary calls to poll.

I struggled a bit trying to fix this. In the latest patch, I changed the notion 
of "remedy" to a "retryAction" and included polling as one of the possible 
actions. Then if the result is finished, we only would call poll if the result 
indicates that it's needed. The only case where I actually use this is when a 
connection has just been initiated.


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
-----------------------------------------------------------


On June 11, 2015, 9:10 p.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> -----------------------------------------------------------
> 
> (Updated June 11, 2015, 9:10 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
>     https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> KAFKA-2168; address review comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to