Thanks for digging into the back ground.

I think it would be good to get feedback from people who work on
clients, too.


-Matthias


On 5/19/19 12:58 PM, Dongjin Lee wrote:
> Hi Matthias,
> 
> I investigated the inconsistencies between `close` semantics of `Producer`,
> `Consumer`, and `AdminClient`. And I found that this inconsistency was
> intended. Here are the details:
> 
> The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
> introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there are
> two differences between `Consumer` and `Producer`;
> 
> 1. `Consumer`s don't have large requests.
> 2. `Consumer#close` is affected by consumer coordinator, whose close
> operation is affected by `request.timeout.ms`.
> 
> By the above reasons, Consumer's default timeout was set a little bit
> different.[^3] (It is done by Rajini.)
> 
> At the initial proposal, I proposed to change the default timeout value of
> `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
> However, since it is now clear that the current implementation is totally
> reasonable, *it seems like changing the approach into just providing a
> close timeout into the clients used by KafkaStreams is a more suitable
> one.*[^4]
> This approach has the following advantages:
> 
> 1. The problem described in KAFKA-7996 now resolved, since Producer doesn't
> hang up while its `close` operation.
> 2. We don't have to change the semantics of `Producer#close`,
> `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
> kinds of changes are hard for users to reason about.
> 
> How do you think?
> 
> Thanks,
> Dongjin
> 
> [^1]:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
> [^2]: "The existing close() method without a timeout will attempt to close
> the consumer gracefully with a default timeout of 30 seconds. This is
> different from the producer default of Long.MAX_VALUE since consumers don't
> have large requests."
> [^3]: 'Rejected Alternatives' section explains it.
> [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close timeout
> is 60 seconds (KIP-198): https://github.com/apache/kafka/pull/3927/files
> 
> On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Thanks for the KIP.
>>
>> Overall, I agree with the sentiment of the KIP. The current semantics of
>> `KafkaStreams#close(timeout)` are not well defined. Also the general
>> client inconsistencies are annoying.
>>
>>
>>> This KIP make any change on public interfaces; however, it makes a
>> subtle change to the existing API's semantics. If this KIP is accepted,
>> documenting these semantics with as much detail as possible may much better.
>>
>> I am not sure if I would call this change "subtle". It might actually be
>> rather big impact and hence I am also wondering about backward
>> compatibility (details below). Overall, I am not sure if documenting the
>> change would be sufficient.
>>
>>
>>
>>> Change the default close timeout of Producer, AdminClient into more
>> reasonable one, not Long.MAX_VALUE.
>>
>> Can you be more specific than "more reasonable", and propose a concrete
>> value? What about backward compatibility? Assume an application wants to
>> block forever by default: with this change, it's required to rewrite
>> code to keep the intended semantics. Hence, the change does not seems to
>> be backward compatible. Also note, that a config change would not be
>> sufficient, but an actual code change would be required.
>>
>> Also, why not go the other direction and default `KafkaConsumer#close()`
>> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also
>> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
>> similar backward compatibility concern raises.
>>
>> Making close() blocking by default seems not un-reasonable per-se. Can
>> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
>>
>>
>>> If succeeded, simply return; if not, close remaining resources with
>> default close timeout.
>>
>> Why do you want to apply the default timeout as fallback? This would
>> violate the user intention, too, and thus, might result in a situation
>> that is not much better than the current one.
>>
>>
>> For KafkaStreams, if there are multiple StreamThreads, would the full
>> timeout be passed into each thread as all of them could shut down in
>> parallel? Or would the timeout be divided over all threads?
>>
>> What about the case when there is a different number of client? For
>> example, with EOS enabled, there are multiple Producers that need to be
>> closed, however, the user might not even be aware of the increased
>> number of producers (or not know how many there actually are).
>>
>>
>> It seems to be hard for users to reason about those dependencies.
>>
>>
>> -Matthias
>>
>>
>> On 4/23/19 6:13 PM, Dongjin Lee wrote:
>>> Hi dev,
>>>
>>> I would like to start the discussion of KIP-459: Improve
>> KafkaStreams#close
>>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>>> .
>>> This proposal is originated from the issue reported via community slack,
>>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In short,
>>> this KIP proposes to resolve this problem by improving existing API's
>>> semantics, not adding any public API changes.
>>>
>>> Please have a look when you are free. All opinions will be highly
>>> appreciated.
>>>
>>> Thanks,
>>> Dongjin
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to