> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 1091 > > <https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line1091> > > > > Since poll() can trigger auto offset commits, and then the commits can > > block while polling() for some time, can we end up recursing in some bad > > situations, e.g. if we consistently cannot get a coordinator? > > > > We might need to keep track if a commit is outstanding and not try to > > commit again, or just update the values we're trying to commit. > > Ewen Cheslack-Postava wrote: > To clarify the issue: in poll(timeout), we check if it is time to > autocommit and call commit, which then calls commitOffsets. In commitOffsets, > we have a while(true) loop and in it we poll for both sync and async. If that > polling process takes long enough, then we could hit the next interval and > those poll() calls could trigger another call to commit. Now we have 2 calls > to commit on the stack. > > I don't think this is likely, and I'm not certain there's a condition > where you can get stuck in the loop that long when using async commits. But > since the logic in commitOffsets current checks response.isReady first, > *then* breaks if its async, I thought it might be possible that during a > connectivity issue with the coordinator, you might just get stuck in this > loop even in async commit mode and trigger this recursive commit behavior > (given a long enough outage/short enough auto commit interval). > > Jason Gustafson wrote: > I think there might be some confusion caused by the the overloading of > the poll method. The poll that is called in commitOffsets is basically just a > wrapper around client.poll() and doesn't recurse. Perhaps I can rename these > methods to clientPoll() to make this clearer?
Ah, yes, that makes sense. I was getting a bit confused when reading some of the code about which poll was being invoked. Adjusting the names would probably help. - Ewen ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86338 ----------------------------------------------------------- On June 4, 2015, 4:07 a.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34789/ > ----------------------------------------------------------- > > (Updated June 4, 2015, 4:07 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2168 > https://issues.apache.org/jira/browse/KAFKA-2168 > > > Repository: kafka > > > Description > ------- > > KAFKA-2168; refactored callback handling to prevent unnecessary requests > > > KAFKA-2168; address review comments > > > KAFKA-2186; fix rebase error and checkstyle issue > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > 8f587bc0705b65b3ef37c86e0c25bb43ab8803de > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > 1ca75f83d3667f7d01da1ae2fd9488fb79562364 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4709f7b112e1f3a39f3c04cfa65f00fa60 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > f50da825756938c193d7f07bee953e000e2627d9 > > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > fac79951d50ef6f19cef5fe62cbc4582b27b145a > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > c5e577ff98bea3de65e290d30065935a29b3247f > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > cee75410127dd1b86c1156563003216d93a086b3 > > clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java > 677edd385f35d4262342b567262c0b874876d25b > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 419541011d652becf0cda7a5e62ce813cddb1732 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java > e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 > > Diff: https://reviews.apache.org/r/34789/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >