[
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14627279#comment-14627279
]
Guozhang Wang commented on KAFKA-1835:
--------------------------------------
Thanks [~becket_qin] for the summary. Some follow-up comments regarding the new
API:
I think case (3) is only for users who are willing to drop messages if topic
metadata is not present, and hence calling partitionsFor "to get the partitions
info for this topic, but if it is not present, I will just drop the messages
and move forward". To cover this case the following pattern:
{code}
partitionsFor(topic);
send(new Record(topic, key, value))
{code}
is a little bit awkward to me, because since the user does not care if the
topic metadata is really present and is willing to drop messages if not, it
makes little sense to call "partitionsFor" but checks nothing on return values
before send anyways. probably due to its function name. In addition, the
behavior of partitionsFor(empty-list) as proposed in KAFKA-2275 is also not
clear.
Personally I feel it is more intuitive for the user's point of view to define
the following:
1. max.block.ms only controls send() calls, which, depending on its value == 0
or not, throw exceptions immediately or wait on metadata refresh if the topic
metadata is not available.
2. partitionsFor(single-topic) is blocking UNTIL the topic metadata is valid,
this function will ever be called by users of case (1) and (2) only.
* If we decide to let max.block.ms only controls send() calls, we may consider
not making it a config but rather modify the API to send(record, timeout,
callback).
3. for KAFKA-2275 on consumers, still add the listTopics(timeout) API, which is
blocked for at most the timeout value. Consumer's partitionsFor() API will only
ever be called by users who want to consume specific topics and also have
customized partitions management and hence should tolerate blocking as well
(i.e. you cannot really be unblocking at all if you want to consume specific
topic-partitions because you do not know when these partitions will be
available).
> Kafka new producer needs options to make blocking behavior explicit
> -------------------------------------------------------------------
>
> Key: KAFKA-1835
> URL: https://issues.apache.org/jira/browse/KAFKA-1835
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Affects Versions: 0.8.2.0, 0.8.3, 0.9.0
> Reporter: Paul Pearcy
> Fix For: 0.8.3
>
> Attachments: KAFKA-1835-New-producer--blocking_v0.patch,
> KAFKA-1835.patch
>
> Original Estimate: 504h
> Remaining Estimate: 504h
>
> The new (0.8.2 standalone) producer will block the first time it attempts to
> retrieve metadata for a topic. This is not the desired behavior in some use
> cases where async non-blocking guarantees are required and message loss is
> acceptable in known cases. Also, most developers will assume an API that
> returns a future is safe to call in a critical request path.
> Discussing on the mailing list, the most viable option is to have the
> following settings:
> pre.initialize.topics=x,y,z
> pre.initialize.timeout=x
>
> This moves potential blocking to the init of the producer and outside of some
> random request. The potential will still exist for blocking in a corner case
> where connectivity with Kafka is lost and a topic not included in pre-init
> has a message sent for the first time.
> There is the question of what to do when initialization fails. There are a
> couple of options that I'd like available:
> - Fail creation of the client
> - Fail all sends until the meta is available
> Open to input on how the above option should be expressed.
> It is also worth noting more nuanced solutions exist that could work without
> the extra settings, they just end up having extra complications and at the
> end of the day not adding much value. For instance, the producer could accept
> and queue messages(note: more complicated than I am making it sound due to
> storing all accepted messages in pre-partitioned compact binary form), but
> you're still going to be forced to choose to either start blocking or
> dropping messages at some point.
> I have some test cases I am going to port over to the Kafka producer
> integration ones and start from there. My current impl is in scala, but
> porting to Java shouldn't be a big deal (was using a promise to track init
> status, but will likely need to make that an atomic bool).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)