Hi,

I have been implementing KIP-219. I discussed the interface changes with
Becket Qin and Dong Lin, and we decided to bump up the protocol version of
ApiVersionsRequest and ApiVersionsResponse only, instead of bumping up all
requests/responses that may be throttled, to indicate clients whether or
not brokers perform throttling before sending out responses. We think this
is sufficient given that network client exchanges
ApiVersionsRequest/Response with each broker when establishing connection
to it and thus it can detect the broker's throttling behavior just by
examining the ApiVersionsResponse version.

Please respond to this e-mail if you have any questions or concerns.

Thanks,
Jon Lee


On Thu, Apr 5, 2018 at 2:29 PM, Becket Qin <becket....@gmail.com> wrote:

>
>
> On Thu, Nov 16, 2017 at 3:49 PM, Becket Qin <becket....@gmail.com> wrote:
>
>> Thanks Rajini,
>>
>> I updated the KIP wiki to clarify the scope of the KIP. To summarize, the
>> current quota has a few caveats:
>> 1. The brokers are only throttling the NEXT request even if the current
>> request is already violating quota. So the broker will always process at
>> least one request from the client.
>> 2. The brokers are not able to know the client id until they fully read
>> the request out of the sockets even if that client is being throttled.
>> 3. The brokers are not communicating to the clients promptly, so the
>> clients have to blindly wait and sometimes times out unnecessarily.
>>
>> This KIP only tries to address 3 but not 1 and 2 because A) those two
>> issues are sort of orthogonal to 3 and B) the solution to 1 and 2 are much
>> more complicated and worth a separate discussion.
>>
>> I'll wait till tomorrow and start a voting thread if there are further
>> concerns raised about the KIP.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Thu, Nov 16, 2017 at 11:47 AM, Rajini Sivaram <rajinisiva...@gmail.com
>> > wrote:
>>
>>> Hi Becket,
>>>
>>> The current user quota doesn't solve the problem. But I was thinking that
>>> if we could ensure we don't read more from the network than the quota
>>> allows, we may be able to fix the issue in a different way (throttling
>>> all
>>> connections, each for a limited time prior to reading large requests).
>>> But
>>> it would be more complex (and even more messy for client-id quotas), so I
>>> can understand why the solution you proposed in the KIP makes sense for
>>> the
>>> scenario that you described.
>>>
>>> Regards,
>>>
>>> Rajini
>>>
>>> On Tue, Nov 14, 2017 at 11:30 PM, Becket Qin <becket....@gmail.com>
>>> wrote:
>>>
>>> > Hi Rajini,
>>> >
>>> > We are using SSL so we could use user quota. But I am not sure if that
>>> > would solve the problem. The key issue in our case is that each broker
>>> can
>>> > only handle ~300 MB/s of incoming bytes, but the MapReduce job is
>>> trying to
>>> > push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the
>>> broker
>>> > cannot sustain. In order to do that, we need to be able to throttle
>>> > requests for more than request timeout, potentially a few minutes. It
>>> seems
>>> > neither user quota nor limited throttle time can achieve this.
>>> >
>>> > Thanks,
>>> >
>>> > Jiangjie (Becket) Qin
>>> >
>>> > On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <
>>> rajinisiva...@gmail.com>
>>> > wrote:
>>> >
>>> > > Hi Becket,
>>> > >
>>> > > For the specific scenario that you described, would it be possible
>>> to use
>>> > > user quotas rather than client-id quotas? With user quotas, perhaps
>>> we
>>> > can
>>> > > throttle more easily before reading requests as well (as you
>>> mentioned,
>>> > the
>>> > > difficulty with client-id quota is that we have to read partial
>>> requests
>>> > > and handle client-ids at network layer making that a much bigger
>>> change).
>>> > > If your clients are using SASL/SSL, I was wondering whether a
>>> solution
>>> > that
>>> > > improves user quotas and limits throttle time would work for you.
>>> > >
>>> > > Regards,
>>> > >
>>> > > Rajini
>>> > >
>>> > >
>>> > >
>>> > > On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <becket....@gmail.com>
>>> > wrote:
>>> > >
>>> > > > Since we will bump up the wire request version, another option is
>>> for
>>> > > > clients that are sending old request versions the broker can just
>>> keep
>>> > > the
>>> > > > current behavior. For clients sending the new request versions, the
>>> > > broker
>>> > > > can respond then mute the channel as described in the KIP wiki. In
>>> this
>>> > > > case, muting the channel is mostly for protection purpose. A
>>> correctly
>>> > > > implemented client should back off for throttle time before
>>> sending the
>>> > > > next request. The downside is that the broker needs to keep both
>>> logic
>>> > > and
>>> > > > it seems not gaining much benefit. So personally I prefer to just
>>> mute
>>> > > the
>>> > > > channel. But I am open to different opinions.
>>> > > >
>>> > > > Thanks,
>>> > > >
>>> > > > Jiangjie (Becket) Qin
>>> > > >
>>> > > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <becket....@gmail.com>
>>> > wrote:
>>> > > >
>>> > > > > Hi Jun,
>>> > > > >
>>> > > > > Hmm, even if a connection is closed by the client when the
>>> channel is
>>> > > > > muted. After the channel is unmuted, it seems Selector.select()
>>> will
>>> > > > detect
>>> > > > > this and close the socket.
>>> > > > > It is true that before the channel is unmuted the socket will be
>>> in a
>>> > > > > CLOSE_WAIT state though. So having an arbitrarily long muted
>>> duration
>>> > > may
>>> > > > > indeed cause problem.
>>> > > > >
>>> > > > > Thanks,
>>> > > > >
>>> > > > > Jiangjie (Becket) Qin
>>> > > > >
>>> > > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <becket....@gmail.com
>>> >
>>> > > wrote:
>>> > > > >
>>> > > > >> Hi Rajini,
>>> > > > >>
>>> > > > >> Thanks for the detail explanation. Please see the reply below:
>>> > > > >>
>>> > > > >> 2. Limiting the throttle time to connection.max.idle.ms on the
>>> > broker
>>> > > > >> side is probably fine. However, clients may have a different
>>> > > > configuration
>>> > > > >> of connection.max.idle.ms and still reconnect before the
>>> throttle
>>> > > time
>>> > > > >> (which is the server side connection.max.idle.ms). It seems
>>> another
>>> > > > back
>>> > > > >> door for quota.
>>> > > > >>
>>> > > > >> 3. I agree we could just mute the server socket until
>>> > > > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big
>>> issue.
>>> > This
>>> > > > >> helps guarantee only connection_rate * connection.max.idle.ms
>>> > sockets
>>> > > > >> will be in CLOSE_WAIT state. For cooperative clients, unmuting
>>> the
>>> > > > socket
>>> > > > >> will not have negative impact.
>>> > > > >>
>>> > > > >> 4. My concern for capping the throttle time to
>>> metrics.window.ms is
>>> > > > that
>>> > > > >> we will not be able to enforce quota effectively. It might be
>>> useful
>>> > > to
>>> > > > >> explain this with a real example we are trying to solve. We
>>> have a
>>> > > > >> MapReduce job pushing data to a Kafka cluster. The MapReduce
>>> job has
>>> > > > >> hundreds of producers and each of them sends a normal sized
>>> > > > ProduceRequest
>>> > > > >> (~2 MB) to each of the brokers in the cluster. Apparently the
>>> client
>>> > > id
>>> > > > >> will ran out of bytes quota pretty quickly, and the broker
>>> started
>>> > to
>>> > > > >> throttle the producers. The throttle time could actually be
>>> pretty
>>> > > long
>>> > > > >> (e.g. a few minute). At that point, request queue time on the
>>> > brokers
>>> > > > was
>>> > > > >> around 30 seconds. After that, a bunch of producer hit
>>> > > > request.timeout.ms
>>> > > > >> and reconnected and sent the next request again, which causes
>>> > another
>>> > > > spike
>>> > > > >> and a longer queue.
>>> > > > >>
>>> > > > >> In the above case, unless we set the quota window to be pretty
>>> big,
>>> > we
>>> > > > >> will not be able to enforce the quota. And if we set the window
>>> size
>>> > > to
>>> > > > a
>>> > > > >> large value, the request might be throttled for longer than
>>> > > > >> connection.max.idle.ms.
>>> > > > >>
>>> > > > >> > We need a solution to improve flow control for well-behaved
>>> > clients
>>> > > > >> > which currently rely entirely on broker's throttling. The KIP
>>> > > > addresses
>>> > > > >> > this using co-operative clients that sleep for an unbounded
>>> > throttle
>>> > > > >> time.
>>> > > > >> > I feel this is not ideal since the result is traffic with a
>>> lot of
>>> > > > >> spikes.
>>> > > > >> > Feedback from brokers to enable flow control in the client is
>>> a
>>> > good
>>> > > > >> idea,
>>> > > > >> > but clients with excessive throttle times should really have
>>> been
>>> > > > >> > configured with smaller batch sizes.
>>> > > > >>
>>> > > > >> This is not really about a single producer with large size, it
>>> is a
>>> > > lot
>>> > > > >> of small producers talking to the client at the same time.
>>> Reducing
>>> > > the
>>> > > > >> batch size does not help much here. Also note that after the
>>> spike
>>> > > > >> traffic at the very beginning, the throttle time of the
>>> > > ProduceRequests
>>> > > > >> processed later are actually going to be increasing (for
>>> example,
>>> > the
>>> > > > first
>>> > > > >> throttled request will be throttled for 1 second, the second
>>> > throttled
>>> > > > >> request will be throttled for 10 sec, etc.). Due to the throttle
>>> > time
>>> > > > >> variation, if every producer honors the throttle time, there
>>> will
>>> > not
>>> > > > be a
>>> > > > >> next spike after the first produce.
>>> > > > >>
>>> > > > >> > We need a solution to enforce smaller quotas to protect the
>>> broker
>>> > > > >> > from misbehaving clients. The KIP addresses this by muting
>>> > channels
>>> > > > for
>>> > > > >> an
>>> > > > >> > unbounded time. This introduces problems of channels in
>>> > CLOSE_WAIT.
>>> > > > And
>>> > > > >> > doesn't really solve all issues with misbehaving clients
>>> since new
>>> > > > >> > connections can be created to bypass quotas.
>>> > > > >>
>>> > > > >> Our current quota only protects cooperating clients because our
>>> > quota
>>> > > is
>>> > > > >> really throttling the NEXT request after process a request even
>>> if
>>> > > this
>>> > > > >> request itself has already violated quota. The misbehaving
>>> client
>>> > are
>>> > > > not
>>> > > > >> protected at all with the current quota mechanism. Like you
>>> > > mentioned, a
>>> > > > >> connection quota is required. We have been discussing about
>>> this at
>>> > > > >> LinkedIn for some time. Doing it right requires some major
>>> changes
>>> > > such
>>> > > > as
>>> > > > >> partially reading a request to identify the client id at network
>>> > level
>>> > > > and
>>> > > > >> disconnect misbehaving clients.
>>> > > > >>
>>> > > > >> While handling misbehaving clients is important, we are not
>>> trying
>>> > to
>>> > > > >> address that in this KIP. This KIP is trying to improve the
>>> > > > communication
>>> > > > >> with good clients. Muting the channel is more of a migration
>>> plan so
>>> > > > that
>>> > > > >> we don't have regression on the old innocent (but good) clients.
>>> > > > >>
>>> > > > >> Thanks,
>>> > > > >>
>>> > > > >> Jiangjie (Becket) Qin
>>> > > > >>
>>> > > > >>
>>> > > > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <j...@confluent.io>
>>> wrote:
>>> > > > >>
>>> > > > >>> Hi, Jiangjie,
>>> > > > >>>
>>> > > > >>> 3. If a client closes a socket connection, typically the server
>>> > only
>>> > > > >>> finds
>>> > > > >>> this out by reading from the channel and getting a negative
>>> size
>>> > > during
>>> > > > >>> the
>>> > > > >>> read. So, if a channel is muted by the server, the server
>>> won't be
>>> > > able
>>> > > > >>> to
>>> > > > >>> detect the closing of the connection by the client after the
>>> socket
>>> > > is
>>> > > > >>> unmuted. That's probably what Rajini wants to convey.
>>> > > > >>>
>>> > > > >>> Thanks,
>>> > > > >>>
>>> > > > >>> Jun
>>> > > > >>>
>>> > > > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <
>>> becket....@gmail.com>
>>> > > > wrote:
>>> > > > >>>
>>> > > > >>> > Thanks Rajini.
>>> > > > >>> >
>>> > > > >>> > 1. Good point. We do need to bump up the protocol version so
>>> that
>>> > > the
>>> > > > >>> new
>>> > > > >>> > clients do not wait for another throttle time when they are
>>> > talking
>>> > > > to
>>> > > > >>> old
>>> > > > >>> > brokers. I'll update the KIP.
>>> > > > >>> >
>>> > > > >>> > 2. That is true. But the client was not supposed to send
>>> request
>>> > to
>>> > > > the
>>> > > > >>> > broker during that period anyways. So detecting the broker
>>> > failure
>>> > > > >>> later
>>> > > > >>> > seems fine?
>>> > > > >>> >
>>> > > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the
>>> > > current
>>> > > > >>> state?
>>> > > > >>> > Currently the broker will still mute the socket until it
>>> sends
>>> > the
>>> > > > >>> response
>>> > > > >>> > back. If the clients disconnect while they are being
>>> throttled,
>>> > the
>>> > > > >>> closed
>>> > > > >>> > socket will not be detected until the throttle time has
>>> passed.
>>> > > > >>> >
>>> > > > >>> > Jun also suggested to bound the time by
>>> metric.sample.window.ms
>>> > in
>>> > > > the
>>> > > > >>> > ticket. I am not sure about the bound on throttle time. It
>>> seems
>>> > a
>>> > > > >>> little
>>> > > > >>> > difficult to come up with a good bound. If the bound is too
>>> > large,
>>> > > it
>>> > > > >>> does
>>> > > > >>> > not really help solve the various timeout issue we may face.
>>> If
>>> > the
>>> > > > >>> bound
>>> > > > >>> > is too low, the quota is essentially not honored. We may
>>> > > potentially
>>> > > > >>> treat
>>> > > > >>> > different requests differently, but that seems too
>>> complicated
>>> > and
>>> > > > >>> error
>>> > > > >>> > prone.
>>> > > > >>> >
>>> > > > >>> > IMO, the key improvement we want to make is to tell the
>>> clients
>>> > how
>>> > > > >>> long
>>> > > > >>> > they will be throttled so the clients knows what happened so
>>> they
>>> > > can
>>> > > > >>> act
>>> > > > >>> > accordingly instead of waiting naively. Muting the socket on
>>> the
>>> > > > broker
>>> > > > >>> > side is just in case of non-cooperating clients. For the
>>> existing
>>> > > > >>> clients,
>>> > > > >>> > it seems this does not have much impact compare with what we
>>> have
>>> > > > now.
>>> > > > >>> >
>>> > > > >>> > Thanks,
>>> > > > >>> >
>>> > > > >>> > Jiangjie (Becket) Qin
>>> > > > >>> >
>>> > > > >>> >
>>> > > > >>> >
>>> > > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
>>> > > > >>> rajinisiva...@gmail.com>
>>> > > > >>> > wrote:
>>> > > > >>> >
>>> > > > >>> > > Hi Becket,
>>> > > > >>> > >
>>> > > > >>> > > Thank you for the KIP. A few comments:
>>> > > > >>> > >
>>> > > > >>> > > 1.KIP says:  "*No public interface changes are needed. We
>>> only
>>> > > > >>> propose
>>> > > > >>> > > behavior change on the broker side.*"
>>> > > > >>> > >
>>> > > > >>> > > But from the proposed changes, it sounds like clients will
>>> be
>>> > > > >>> updated to
>>> > > > >>> > > wait for throttle-time before sending next response, and
>>> also
>>> > not
>>> > > > >>> handle
>>> > > > >>> > > idle disconnections during that time. Doesn't that mean
>>> that
>>> > > > clients
>>> > > > >>> need
>>> > > > >>> > > to know that the broker has sent the response before
>>> > throttling,
>>> > > > >>> > requiring
>>> > > > >>> > > protocol/version change?
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > 2. At the moment, broker failures are detected by clients
>>> (and
>>> > > vice
>>> > > > >>> > versa)
>>> > > > >>> > > within connections.max.idle.ms. By removing this check
>>> for an
>>> > > > >>> unlimited
>>> > > > >>> > > throttle time, failure detection could be delayed.
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > 3. KIP says  "*Since this subsequent request is not
>>> actually
>>> > > > handled
>>> > > > >>> > until
>>> > > > >>> > > the broker unmutes the channel, the client can hit
>>> > > > >>> request.timeout.ms
>>> > > > >>> > > <http://request.timeout.ms> and reconnect. However, this
>>> is no
>>> > > > worse
>>> > > > >>> > than
>>> > > > >>> > > the current state.*"
>>> > > > >>> > >
>>> > > > >>> > > I think this could be worse than the current state because
>>> > broker
>>> > > > >>> doesn't
>>> > > > >>> > > detect and close the channel for an unlimited throttle
>>> time,
>>> > > while
>>> > > > >>> new
>>> > > > >>> > > connections will get accepted. As a result, lot of
>>> connections
>>> > > > could
>>> > > > >>> be
>>> > > > >>> > in
>>> > > > >>> > > CLOSE_WAIT state when throttle time is high.
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > Perhaps it is better to combine this KIP with a bound on
>>> > throttle
>>> > > > >>> time?
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > Regards,
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > Rajini
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <
>>> > becket....@gmail.com
>>> > > >
>>> > > > >>> wrote:
>>> > > > >>> > >
>>> > > > >>> > > > Thanks for the comment, Jun,
>>> > > > >>> > > >
>>> > > > >>> > > > 1. Yes, you are right. This could also happen with the
>>> > current
>>> > > > >>> quota
>>> > > > >>> > > > mechanism because we are essentially muting the socket
>>> during
>>> > > > >>> throttle
>>> > > > >>> > > > time. There might be two ways to solve this.
>>> > > > >>> > > > A) use another socket to send heartbeat.
>>> > > > >>> > > > B) let the GroupCoordinator know that the client will not
>>> > > > >>> heartbeat for
>>> > > > >>> > > > some time.
>>> > > > >>> > > > It seems the first solution is cleaner.
>>> > > > >>> > > >
>>> > > > >>> > > > 2. For consumer it seems returning an empty response is a
>>> > > better
>>> > > > >>> > option.
>>> > > > >>> > > In
>>> > > > >>> > > > the producer case, if there is a spike in traffic. The
>>> > brokers
>>> > > > >>> will see
>>> > > > >>> > > > queued up requests, but that is hard to avoid unless we
>>> have
>>> > > > >>> connection
>>> > > > >>> > > > level quota, which is a bigger change and may be easier
>>> to
>>> > > > discuss
>>> > > > >>> it
>>> > > > >>> > in
>>> > > > >>> > > a
>>> > > > >>> > > > separate KIP.
>>> > > > >>> > > >
>>> > > > >>> > > > Thanks,
>>> > > > >>> > > >
>>> > > > >>> > > > Jiangjie (Becket) Qin
>>> > > > >>> > > >
>>> > > > >>> > > >
>>> > > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <
>>> j...@confluent.io>
>>> > > > wrote:
>>> > > > >>> > > >
>>> > > > >>> > > > > Hi, Jiangjie,
>>> > > > >>> > > > >
>>> > > > >>> > > > > Thanks for bringing this up. A couple of quick
>>> thoughts.
>>> > > > >>> > > > >
>>> > > > >>> > > > > 1. If the throttle time is large, what can happen is
>>> that a
>>> > > > >>> consumer
>>> > > > >>> > > > won't
>>> > > > >>> > > > > be able to heart beat to the group coordinator frequent
>>> > > enough.
>>> > > > >>> In
>>> > > > >>> > that
>>> > > > >>> > > > > case, even with this KIP, it seems there could be
>>> frequent
>>> > > > >>> consumer
>>> > > > >>> > > group
>>> > > > >>> > > > > rebalances.
>>> > > > >>> > > > >
>>> > > > >>> > > > > 2. If we return a response immediately, for the
>>> consumer,
>>> > do
>>> > > we
>>> > > > >>> > return
>>> > > > >>> > > > the
>>> > > > >>> > > > > requested data or an empty response? If we do the
>>> former,
>>> > it
>>> > > > may
>>> > > > >>> not
>>> > > > >>> > > > > protect against the case when there are multiple
>>> consumer
>>> > > > >>> instances
>>> > > > >>> > > > > associated with the same user/clientid.
>>> > > > >>> > > > >
>>> > > > >>> > > > > Jun
>>> > > > >>> > > > >
>>> > > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
>>> > > > becket....@gmail.com
>>> > > > >>> >
>>> > > > >>> > > wrote:
>>> > > > >>> > > > >
>>> > > > >>> > > > > > Hi,
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > We would like to start the discussion on KIP-219.
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > The KIP tries to improve quota throttling time
>>> > > communication
>>> > > > >>> > between
>>> > > > >>> > > > > > brokers and clients to avoid clients timeout in case
>>> of
>>> > > long
>>> > > > >>> > > throttling
>>> > > > >>> > > > > > time.
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > The KIP link is following:
>>> > > > >>> > > > > > https://cwiki.apache.org/confl
>>> uence/display/KAFKA/KIP-
>>> > > > >>> > > > > 219+-+Improve+quota+
>>> > > > >>> > > > > > communication
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > Comments are welcome.
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > Thanks,
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > Jiangjie (Becket) Qin
>>> > > > >>> > > > > >
>>> > > > >>> > > > >
>>> > > > >>> > > >
>>> > > > >>> > >
>>> > > > >>> >
>>> > > > >>>
>>> > > > >>
>>> > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to