> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: > > This looks good so far. I think it's much easier to understand when all the > > blocking stuff happens at the KafkaConsumer level and each of the classes > > it uses only ever handles single requests. It'd be nice to document the > > basic architecture somewhere since it took me a bit to fully figure it out. > > (Unfortunately, since the javadocs for the consumer are in the > > implementation class KafkaConsumer instead of on the Consumer interface, we > > can't put this with the KafkaConsumer class...) > > > > Some notes in addition to the inline stuff: > > > > Some functionality has been pulled back up to KafkaConsumer, in a mild > > reversal of Guozhang's refactoring. It'd be nice to keep this to a minimum. > > The ones that stuck out to me were > > resetOffsets()/resetOffset()/offsetBefore(). I'm guessing you also couldn't > > figure out a way to keep it in Fetcher since the inner call to > > offsetBefore() requires that blocking loop? > > > > Some handling of DelayedResponses and its subclasses seem redundant/follows > > a common pattern and maybe could be refactored into utility code. However, > > there are few enough places it's happening now that I don't think it's a > > big deal. It does seem a bit wasteful that we have to continually create > > these DelayedResponse objects even in cases where we know we'll fail fast, > > but I suppose those cases should be unusual and the cost to allocate them > > isn't all that high. > > > > Finally, a readability/cleanliness thing. This patch adds more nested > > anonymous RequestCompletionHandler classes. I think these are fine as they > > are, but if the implementations get too long or branchy with all the > > various error conditions they can become unreadably over-indented. Taking > > some of the big ones and using named nested classes might help improve > > clarity, although it does separate the request initiating code from the > > response handling code. > > Jason Gustafson wrote: > Yeah, that's right. The offsetBefore method was the tricky one to deal > with. I tried to keep the lower level logic in Fetcher, but resetOffsets > ended up leaking into KafkaConsumer. That was before I started using the > DelayedResponse interface though, so there may be a way to move it back to > Fetcher. > > We could definitely reduce some of the DelayedResponse object creation > using several static instances. I think there are only a couple cases where > it is actually necessary to have a new instance. I also think the usage is > currently a little nasty since you have to make sure that the delayed > response is finished through all code paths. That could > > As you mention, it's kind of nice having the handler code right there in > the method. I actually kind of like the pattern used in some other cases > where instead of a nested class, an anonymous class is created which simply > delegates to a handler method.
Forgot to finish my thought about the nastiness of DelayedResponse usage... The issue is having to always ensure that the DelayedResponse is finished. If it doesn't get finished, then the consumer ends up retrying the request (after timing out). The code might just need to be restructured a little bit so that it's clearer that it does get finished in all cases. - Jason ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86338 ----------------------------------------------------------- On June 3, 2015, 12:10 a.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34789/ > ----------------------------------------------------------- > > (Updated June 3, 2015, 12:10 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2168 > https://issues.apache.org/jira/browse/KAFKA-2168 > > > Repository: kafka > > > Description > ------- > > KAFKA-2168; refactored callback handling to prevent unnecessary requests > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > 8f587bc0705b65b3ef37c86e0c25bb43ab8803de > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > 1ca75f83d3667f7d01da1ae2fd9488fb79562364 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4709f7b112e1f3a39f3c04cfa65f00fa60 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > f50da825756938c193d7f07bee953e000e2627d9 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > b2764df11afa7a99fce46d1ff48960d889032d14 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > ef9dd5238fbc771496029866ece1d85db6d7b7a5 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > cee75410127dd1b86c1156563003216d93a086b3 > > clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java > 677edd385f35d4262342b567262c0b874876d25b > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 419541011d652becf0cda7a5e62ce813cddb1732 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java > e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 > > Diff: https://reviews.apache.org/r/34789/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >