> On May 31, 2015, 9:10 p.m., Guozhang Wang wrote: > > Thanks for the explanation Ewen. I agree that a delayed scheduler would be > > a good fit here, but was originally more concerned about the complexity we > > introduced by adding two queues (one for delayed actions and another for > > handling order-preserving commits). After thinking about it a bit more, I > > feel the complexity mainly comes from the place where we need to make > > blocking calls, that involve also triggering the delayed tasks' poll: > > > > > > 1. Consumer.poll() where we will "block" until the timeout has elapsed. > > 2. Consumer.awaitMetadataUpdate() where we block until metadata refreshed. > > 3. CommitOffsetHandler.poll() where we block until this request completed > > via client.completeAll(). > > > > Basically we need to remember each of those places and make sure delayed > > tasks gets polled also while we are blocking. So I am wondering if we could > > refactor this patch a bit as: > > > > 1. Move DelayedTask / DelayedTaskQueue class to o.a.k.common. > > > > 2. Add the delayedTask to KafkaClient with a new API along side with > > send(); more specifically we can: > > a. Rename send() to scheduleOnce(request), which queue up the given > > request to be sent in the next poll"; > > b. Add scheduleRecurring(request, interval), which "triggers scheduleOnce > > every interval". > > c. In poll(), check whether we should schedule a request via ScheduleOnce > > as we did in this patch. > > > > As for the commitOffsetRequests queue and the #.retry config, if we are > > expecting in the future some more requests like the sync commit will be > > added to the consumer, we may want to make them more general, for example > > making them as ConsumerConfig.RETRIES_CONFIG like > > ProducerConfig.RETRIES_CONFIG, and Queue<RequestCompletionHandler> > > scheduledRequests.
Also I am curious how KAFKA-2168 will be leveraging this patch to add the wakeup call, could you elaborate a bit? - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33196/#review85920 ----------------------------------------------------------- On May 29, 2015, 6:11 p.m., Ewen Cheslack-Postava wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33196/ > ----------------------------------------------------------- > > (Updated May 29, 2015, 6:11 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2123 > https://issues.apache.org/jira/browse/KAFKA-2123 > > > Repository: kafka > > > Description > ------- > > KAFKA-2123: Add queuing of offset commit requests. > > > KAFKA-2123: Add scheduler for delayed tasks in new consumer, add backoff for > commit retries, and simplify auto commit by using delayed tasks. > > > KAFKA-2123: Make synchronous offset commits wait for previous requests to > finish in order. > > > KAFKA-2123: Remove redundant calls to ensureNotClosed > > > KAFKA-2123: Address review comments. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > 8f587bc0705b65b3ef37c86e0c25bb43ab8803de > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > bdff518b732105823058e6182f445248b45dc388 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4709f7b112e1f3a39f3c04cfa65f00fa60 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > f50da825756938c193d7f07bee953e000e2627d9 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > b2764df11afa7a99fce46d1ff48960d889032d14 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java > PRE-CREATION > core/src/test/scala/integration/kafka/api/ConsumerTest.scala > a1eed965a148eb19d9a6cefbfce131f58aaffc24 > > Diff: https://reviews.apache.org/r/33196/diff/ > > > Testing > ------- > > > Thanks, > > Ewen Cheslack-Postava > >
