> On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 1212 > > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212> > > > > -1 makes the pollClient block forever. So, we don't get a chance to do > > the wakeup check. > > Jason Gustafson wrote: > I might be wrong, but I think we can still use NetworkClient.wakeup to > interrupt a poll call which is waiting forever. > > Jun Rao wrote: > Yes, you are right. I missed that. > > Jun Rao wrote: > Actually, this seems to be still a problem. The issus is that when > NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the > case, the next NetworkClient.poll may still block for the timeout. > > Jason Gustafson wrote: > From the javadocs for Selector, the wakeup will apply to the next poll if > one is not in progress. But perhaps we should just check the wakeup flag > before entering the poll to be safe.
Yes, in that case, this is not an issue. We probaly don't have to check the wakeup flag before the poll call since the flag could change immediately after the check. > On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > lines 797-798 > > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797> > > > > Hmm, seekToBegining() is supposed to be a blocking call. Basically, at > > the end of the call, we expect the fetch offset to be set to the beginning. > > This is now changed to async, which doesn't match the intended behavior. We > > need to think through if this matters or not. > > > > Ditto for seekToEnd(). > > Jason Gustafson wrote: > Since we always update fetch positions before a new fetch and in > position(), it didn't seem necessary to make it synchronous. I thought this > handling might be more consistent with how new subscriptions are handled > (which are asynchronous and defer the initial offset fetch until the next > poll or position). That being said, I don't have a strong feeling about it, > so we could return to the blocking version. > > Jun Rao wrote: > Making this async may be fine. One implication is that we call position() > immediately after seekToBeginning(), we may not be able to get the correct > offset. > > Jason Gustafson wrote: > We should be able to get the right offset since we always update offsets > before returning the current position, but we might have to block for it. > It's similar to if you call subscribe(topic) and then try to get its position > immediately. That may work. However, if one calls seekToBegining() followed by seekToEnd(), will we guarantee that position() returns the end offset? - Jun ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87190 ----------------------------------------------------------- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34789/ > ----------------------------------------------------------- > > (Updated June 5, 2015, 7:45 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2168 > https://issues.apache.org/jira/browse/KAFKA-2168 > > > Repository: kafka > > > Description > ------- > > KAFKA-2168; refactored callback handling to prevent unnecessary requests > > > KAFKA-2168; address review comments > > > KAFKA-2168; fix rebase error and checkstyle issue > > > KAFKA-2168; address review comments and add docs > > > KAFKA-2168; handle polling with timeout 0 > > > KAFKA-2168; timeout=0 means return immediately > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > 8f587bc0705b65b3ef37c86e0c25bb43ab8803de > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > 1ca75f83d3667f7d01da1ae2fd9488fb79562364 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d1d1ec178f60dc47d408f52a89e52886c1a093a2 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > f50da825756938c193d7f07bee953e000e2627d9 > > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > c1496a0851526f3c7d3905ce4bdff2129c83a6c1 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > 56281ee15cc33dfc96ff64d5b1e596047c7132a4 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java > e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > cee75410127dd1b86c1156563003216d93a086b3 > > clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java > 677edd385f35d4262342b567262c0b874876d25b > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 419541011d652becf0cda7a5e62ce813cddb1732 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java > ecc78cedf59a994fcf084fa7a458fe9ed5386b00 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java > e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 > > Diff: https://reviews.apache.org/r/34789/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >