> On June 15, 2015, 6:09 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 1199 > > <https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line1199> > > > > Doesn't this cause poll() to block for the backoff time?
Generally it wouldn't, but there are a couple cases where it could. For example, we could be waiting to discover the coordinator, or we might need to fetch some partition offsets. In either case, we could block longer than the poll timeout. Note that this is consistent with the old version. I had hoped to address it as part of solving KAFKA-1894. > On June 15, 2015, 6:09 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 928 > > <https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line928> > > > > Could you explain a bit more how wakeup() can be used for commit > > offsets or seek to a new position? Does a separate thread have to first > > save some partitions for committing offsets in the poll thread and then > > wake it up? Then the poll thread will initiate the offset commit before the > > next poll request? It's admittedly a little awkward, but you could do it the way you suggest. For example, you could use a flag and check it in the poll thread: ``` while (true) { if (needCommit.get()) { consumer.commit(offsets) needCommit.set(false) } records = consumer.poll(50000) // submit records to executor perhaps } ``` Then the other thread just needs to set the flag and offsets that need committing and wakeup the consumer. Alternatively, you could protect the consumer with a lock and commit the offsets from a separate thread. The difficulty with this solution is that you need to make sure that the polling thread does not simply reacquire the lock after being woken up. This problem can be dealt with by introducing a second lock or perhaps by getting tricky with a semaphore, but I'd probably recommend the previous solution instead since it's easier to understand. > On June 15, 2015, 6:09 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > lines 686-687 > > <https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line686> > > > > It's not very clear to me if the caller is woken up from a poll() call, > > whether the next poll() call will resume from the exact same state the > > previous one is left off. > > > > For example, let's say the first pool() call just issued a > > consumerMetadataRequest in ensureCoordinatorKnown() and then is woken up. > > In the next poll() call, do we just wait for the response from the previous > > consumerMetadataRequest or will it trigger a new consumerMetadataRequest? > > If it's the latter, will the two responses be processed out of order? Waking up the consumer can leave requests pending. This could also happen if we change the code to actually respect the poll timeout (and I think this is the main reason why it hasn't been done). I was thinking about wakeup() as primarily a way to get timely shutdown behavior, but if it is also used for commits or seeks, then we have to solve this problem. I have thought about adding a cancel() operation to the RequestFuture. It wouldn't stop an in-flight request, but you could prevent its callback from being executed. This would probably be good enough to deal with a pending consumerMetadataRequest, but we wouldn't want to leave a JoinGroup request pending since resending it may cause unnecessary rebalancing. Perhaps we shouldn't allow wakeups in those cases? Maybe the only case we actually wakeup from is when we are polling for data? - Jason ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87880 ----------------------------------------------------------- On June 11, 2015, 9:10 p.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34789/ > ----------------------------------------------------------- > > (Updated June 11, 2015, 9:10 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2168 > https://issues.apache.org/jira/browse/KAFKA-2168 > > > Repository: kafka > > > Description > ------- > > KAFKA-2168; refactored callback handling to prevent unnecessary requests > > > KAFKA-2168; address review comments > > > KAFKA-2168; fix rebase error and checkstyle issue > > > KAFKA-2168; address review comments and add docs > > > KAFKA-2168; handle polling with timeout 0 > > > KAFKA-2168; timeout=0 means return immediately > > > KAFKA-2168; address review comments > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > 8f587bc0705b65b3ef37c86e0c25bb43ab8803de > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > 1ca75f83d3667f7d01da1ae2fd9488fb79562364 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d1d1ec178f60dc47d408f52a89e52886c1a093a2 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > f50da825756938c193d7f07bee953e000e2627d9 > > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > c1496a0851526f3c7d3905ce4bdff2129c83a6c1 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > 56281ee15cc33dfc96ff64d5b1e596047c7132a4 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java > e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > cee75410127dd1b86c1156563003216d93a086b3 > > clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java > 677edd385f35d4262342b567262c0b874876d25b > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 419541011d652becf0cda7a5e62ce813cddb1732 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java > ecc78cedf59a994fcf084fa7a458fe9ed5386b00 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java > e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 > > Diff: https://reviews.apache.org/r/34789/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >