[
https://issues.apache.org/jira/browse/KAFKA-1780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14214897#comment-14214897
]
Ewen Cheslack-Postava commented on KAFKA-1780:
----------------------------------------------
Patch attached adds a NonBlocingIteratorTemplate subclass for IteratorTemplate
and makes ConsumerIterator implement it. A few notes:
* I moved peek() and added poll() to a subclass of IteratorTemplate because not
all implementations of IteratorTemplate can implement them correctly.
* The logic to figure out timeouts and throw ConsumerTimeoutExceptions is now a
bit confusing because we have 2 ways of setting timeouts. Would have been nicer
to just do the peek()/poll() in the first place.
* Adjusted the constraints on the IteratorTemplate/NonBlockingIteratorTemplate
item type. The code already assumed nullable types, but didn't enforce it. In
fact, the IteratorTemplateTest used a non-nullable type and just happens to
work ok. As far as I can tell, adding this restriction has no negative effects.
Not adding it and using null.asInstanceOf[T] where necessary permits invalid
iterator implementations.
* Removed peek() from IteratorTemplate. It wasn't implemented correctly anyway
and isn't used anywhere in the Kafka code. However, it is a public interface,
so I'm not sure if we should actually remove it.
> Add peek()/poll() for ConsumerIterator
> --------------------------------------
>
> Key: KAFKA-1780
> URL: https://issues.apache.org/jira/browse/KAFKA-1780
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8.1.1
> Reporter: Ewen Cheslack-Postava
> Assignee: Ewen Cheslack-Postava
> Attachments: KAFKA-1780.patch
>
>
> Currently, all consumer operations (next(), haveNext()) block. This is
> problematic for a couple of use cases. Most obviously, a peek() method would
> be nice so you can at least check whether any data is immediately available,
> getting a null value back if it's not.
> A more difficult example is a proxy with a timeout, i.e. it consumes messages
> for up to N ms or M messages, and returns whatever it has at the end of that
> period. It's possible to approximate that with peek, but requires aggressive
> polling to match the proxy's timeout. A poll(timeout) method would allow for
> a correct implementation, where each call to poll gets a single message, but
> also allows the user to specify a custom timeout.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)