[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14643418#comment-14643418
 ] 

Jiangjie Qin commented on KAFKA-2350:
-------------------------------------

[~hachikuji], I am with [~guozhang] that it is much clear if we only support 
either auto partition assignment or manual partition assignment, but not mixed 
mode. I assumed pause/unpause will only be used when auto partition assignment 
is used, so we can check the assigned partition set. If it is under manual 
partition assignment, for the seek(), I assume that you will start consume 
after seek? If so, it might be the same as:
{code}
subscribe(tp)
seek(tp, offset)
poll()
{code}

[~guozhang], I see your point. I am convinced that for people who are using 
auto partition assignment, pause/unpause is more intuitive than using partition 
level sub/unsub. I am not really oppose to having them. What I was worrying is 
that we are adding methods that are intuitive to some particular use case, but 
potentially open the door for adding APIs that only have subtle differences. If 
we take a closer look at our API, there are potentially some other cases we can 
argue for new API. e.g. user might want to have auto commit turn on only for 
some but not all of the partitions they subscribed to. User might want to find 
a list of offsets of a partition within a time range. These are all different 
use cases, but likely can be solved with some lower level API calls instead of 
having a dedicate intuitive API for each of them.

I kind of feel the dilemma we are facing now is that in new consumer, we try to 
address both the high level consumer and low level consumer use cases. 
pause/unpause looks to me a medium-to-low level use case. As the higher level 
requirements can vary a lot and have subtle difference from one to another, the 
question to be answered is that should we expose a high level interface for 
each of the high level use case? Or should we just ask user to use a lower 
level API as long as we support the functionality.

My understanding is that for high level consumer use cases, hopefully we don't 
need user to care too much about the underlying mechanism. For people who wants 
to deal with lower level concept such as offsets, partition assignment, 
temporary consumption suspension, instead of having on high level API written 
for each of the use cases, letting user use a lower level API makes sense to me.

In terms of the example you mentioned, can we solve them by throwing 
appropriate exceptions?

{code}
// Auto partition assignment
subscribe(topic1) // assuming only topic1-partition0 is assigned to this 
consumer.
subscribe(topic1-partition1) // throw IllegalStateException(Cannot subscribe to 
topic1-partition1 because topic1 is managed by consumer coordinator)
unsubscribe(topic1-partition1) // throw IllegalStateException(Topic1-partiion1 
is managed by consumer coordinator, and topic1-partition1 is not assigned to 
this consumer.)
{code}

{code}
subscribe(topic1-partition0)
subscribe(topic1) // throw IllegalStateException(Cannot subscribe to topic1 
because the assignment of topic1 is manually managed)
unsubscribe(topic1-partition1) // throw IllegalStateException(Cannot 
unsubscribe topic1-partition1 because it is not subscribed)
{code}

[~nehanarkhede], what you said makes a lot of sense. I guess I'm just looking 
at the problem from a different angle - user either wants to consume from a 
topic or not at a certain point, whether temporarily or permanently. So the 
state I see is only CONSUME/NOT_CONSUME. The PAUSE state for consumer would be 
pretty much the same as NOT_CONSUME. I might have missed some use case like 
[~hachikuji] mentioned - seek() on a PAUSE partition, but that can be solved by 
calling subscribe() first.

[~gwenshap], good point about heartbeat. We actually got some feedback from 
users in LinkedIn and found that putting the responsibility of sending 
heartbeat on user might be a problem in the first place... We may have 
pause/unpause as a workaround, but the ultimate issue is that maybe we are 
asking too much from user to maintain the heartbeat...

> Add KafkaConsumer pause capability
> ----------------------------------
>
>                 Key: KAFKA-2350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2350
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
> a rebalance will be triggered and your partitions will be reassigned to 
> another consumer. The desired behavior is instead that you keep the partition 
> assigned and simply 
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(TopicPartition... partitions);
> void resume(TopicPartition... partitions);
> {code}
> Here is the expected behavior of pause/resume:
> * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
> any new fetches for that partition.
> * After the partition is resumed, fetches will begin again. 
> * While a partition is paused, seek() and position() can still be used to 
> advance or query the current position.
> * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to