Thanks for digging into the back ground. I think it would be good to get feedback from people who work on clients, too.
-Matthias On 5/19/19 12:58 PM, Dongjin Lee wrote: > Hi Matthias, > > I investigated the inconsistencies between `close` semantics of `Producer`, > `Consumer`, and `AdminClient`. And I found that this inconsistency was > intended. Here are the details: > > The current `KafkaConsumer#close`'s default timeout, 30 seconds, was > introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there are > two differences between `Consumer` and `Producer`; > > 1. `Consumer`s don't have large requests. > 2. `Consumer#close` is affected by consumer coordinator, whose close > operation is affected by `request.timeout.ms`. > > By the above reasons, Consumer's default timeout was set a little bit > different.[^3] (It is done by Rajini.) > > At the initial proposal, I proposed to change the default timeout value of > `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one; > However, since it is now clear that the current implementation is totally > reasonable, *it seems like changing the approach into just providing a > close timeout into the clients used by KafkaStreams is a more suitable > one.*[^4] > This approach has the following advantages: > > 1. The problem described in KAFKA-7996 now resolved, since Producer doesn't > hang up while its `close` operation. > 2. We don't have to change the semantics of `Producer#close`, > `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these > kinds of changes are hard for users to reason about. > > How do you think? > > Thanks, > Dongjin > > [^1]: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers > [^2]: "The existing close() method without a timeout will attempt to close > the consumer gracefully with a default timeout of 30 seconds. This is > different from the producer default of Long.MAX_VALUE since consumers don't > have large requests." > [^3]: 'Rejected Alternatives' section explains it. > [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close timeout > is 60 seconds (KIP-198): https://github.com/apache/kafka/pull/3927/files > > On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Thanks for the KIP. >> >> Overall, I agree with the sentiment of the KIP. The current semantics of >> `KafkaStreams#close(timeout)` are not well defined. Also the general >> client inconsistencies are annoying. >> >> >>> This KIP make any change on public interfaces; however, it makes a >> subtle change to the existing API's semantics. If this KIP is accepted, >> documenting these semantics with as much detail as possible may much better. >> >> I am not sure if I would call this change "subtle". It might actually be >> rather big impact and hence I am also wondering about backward >> compatibility (details below). Overall, I am not sure if documenting the >> change would be sufficient. >> >> >> >>> Change the default close timeout of Producer, AdminClient into more >> reasonable one, not Long.MAX_VALUE. >> >> Can you be more specific than "more reasonable", and propose a concrete >> value? What about backward compatibility? Assume an application wants to >> block forever by default: with this change, it's required to rewrite >> code to keep the intended semantics. Hence, the change does not seems to >> be backward compatible. Also note, that a config change would not be >> sufficient, but an actual code change would be required. >> >> Also, why not go the other direction and default `KafkaConsumer#close()` >> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also >> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a >> similar backward compatibility concern raises. >> >> Making close() blocking by default seems not un-reasonable per-se. Can >> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds? >> >> >>> If succeeded, simply return; if not, close remaining resources with >> default close timeout. >> >> Why do you want to apply the default timeout as fallback? This would >> violate the user intention, too, and thus, might result in a situation >> that is not much better than the current one. >> >> >> For KafkaStreams, if there are multiple StreamThreads, would the full >> timeout be passed into each thread as all of them could shut down in >> parallel? Or would the timeout be divided over all threads? >> >> What about the case when there is a different number of client? For >> example, with EOS enabled, there are multiple Producers that need to be >> closed, however, the user might not even be aware of the increased >> number of producers (or not know how many there actually are). >> >> >> It seems to be hard for users to reason about those dependencies. >> >> >> -Matthias >> >> >> On 4/23/19 6:13 PM, Dongjin Lee wrote: >>> Hi dev, >>> >>> I would like to start the discussion of KIP-459: Improve >> KafkaStreams#close >>> < >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close >>> . >>> This proposal is originated from the issue reported via community slack, >>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In short, >>> this KIP proposes to resolve this problem by improving existing API's >>> semantics, not adding any public API changes. >>> >>> Please have a look when you are free. All opinions will be highly >>> appreciated. >>> >>> Thanks, >>> Dongjin >>> >> >> >
signature.asc
Description: OpenPGP digital signature