Hi Rajini,

We are using SSL so we could use user quota. But I am not sure if that
would solve the problem. The key issue in our case is that each broker can
only handle ~300 MB/s of incoming bytes, but the MapReduce job is trying to
push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the broker
cannot sustain. In order to do that, we need to be able to throttle
requests for more than request timeout, potentially a few minutes. It seems
neither user quota nor limited throttle time can achieve this.

Thanks,

Jiangjie (Becket) Qin

On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <rajinisiva...@gmail.com>
wrote:

> Hi Becket,
>
> For the specific scenario that you described, would it be possible to use
> user quotas rather than client-id quotas? With user quotas, perhaps we can
> throttle more easily before reading requests as well (as you mentioned, the
> difficulty with client-id quota is that we have to read partial requests
> and handle client-ids at network layer making that a much bigger change).
> If your clients are using SASL/SSL, I was wondering whether a solution that
> improves user quotas and limits throttle time would work for you.
>
> Regards,
>
> Rajini
>
>
>
> On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <becket....@gmail.com> wrote:
>
> > Since we will bump up the wire request version, another option is for
> > clients that are sending old request versions the broker can just keep
> the
> > current behavior. For clients sending the new request versions, the
> broker
> > can respond then mute the channel as described in the KIP wiki. In this
> > case, muting the channel is mostly for protection purpose. A correctly
> > implemented client should back off for throttle time before sending the
> > next request. The downside is that the broker needs to keep both logic
> and
> > it seems not gaining much benefit. So personally I prefer to just mute
> the
> > channel. But I am open to different opinions.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <becket....@gmail.com> wrote:
> >
> > > Hi Jun,
> > >
> > > Hmm, even if a connection is closed by the client when the channel is
> > > muted. After the channel is unmuted, it seems Selector.select() will
> > detect
> > > this and close the socket.
> > > It is true that before the channel is unmuted the socket will be in a
> > > CLOSE_WAIT state though. So having an arbitrarily long muted duration
> may
> > > indeed cause problem.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <becket....@gmail.com>
> wrote:
> > >
> > >> Hi Rajini,
> > >>
> > >> Thanks for the detail explanation. Please see the reply below:
> > >>
> > >> 2. Limiting the throttle time to connection.max.idle.ms on the broker
> > >> side is probably fine. However, clients may have a different
> > configuration
> > >> of connection.max.idle.ms and still reconnect before the throttle
> time
> > >> (which is the server side connection.max.idle.ms). It seems another
> > back
> > >> door for quota.
> > >>
> > >> 3. I agree we could just mute the server socket until
> > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big issue. This
> > >> helps guarantee only connection_rate * connection.max.idle.ms sockets
> > >> will be in CLOSE_WAIT state. For cooperative clients, unmuting the
> > socket
> > >> will not have negative impact.
> > >>
> > >> 4. My concern for capping the throttle time to metrics.window.ms is
> > that
> > >> we will not be able to enforce quota effectively. It might be useful
> to
> > >> explain this with a real example we are trying to solve. We have a
> > >> MapReduce job pushing data to a Kafka cluster. The MapReduce job has
> > >> hundreds of producers and each of them sends a normal sized
> > ProduceRequest
> > >> (~2 MB) to each of the brokers in the cluster. Apparently the client
> id
> > >> will ran out of bytes quota pretty quickly, and the broker started to
> > >> throttle the producers. The throttle time could actually be pretty
> long
> > >> (e.g. a few minute). At that point, request queue time on the brokers
> > was
> > >> around 30 seconds. After that, a bunch of producer hit
> > request.timeout.ms
> > >> and reconnected and sent the next request again, which causes another
> > spike
> > >> and a longer queue.
> > >>
> > >> In the above case, unless we set the quota window to be pretty big, we
> > >> will not be able to enforce the quota. And if we set the window size
> to
> > a
> > >> large value, the request might be throttled for longer than
> > >> connection.max.idle.ms.
> > >>
> > >> > We need a solution to improve flow control for well-behaved clients
> > >> > which currently rely entirely on broker's throttling. The KIP
> > addresses
> > >> > this using co-operative clients that sleep for an unbounded throttle
> > >> time.
> > >> > I feel this is not ideal since the result is traffic with a lot of
> > >> spikes.
> > >> > Feedback from brokers to enable flow control in the client is a good
> > >> idea,
> > >> > but clients with excessive throttle times should really have been
> > >> > configured with smaller batch sizes.
> > >>
> > >> This is not really about a single producer with large size, it is a
> lot
> > >> of small producers talking to the client at the same time. Reducing
> the
> > >> batch size does not help much here. Also note that after the spike
> > >> traffic at the very beginning, the throttle time of the
> ProduceRequests
> > >> processed later are actually going to be increasing (for example, the
> > first
> > >> throttled request will be throttled for 1 second, the second throttled
> > >> request will be throttled for 10 sec, etc.). Due to the throttle time
> > >> variation, if every producer honors the throttle time, there will not
> > be a
> > >> next spike after the first produce.
> > >>
> > >> > We need a solution to enforce smaller quotas to protect the broker
> > >> > from misbehaving clients. The KIP addresses this by muting channels
> > for
> > >> an
> > >> > unbounded time. This introduces problems of channels in CLOSE_WAIT.
> > And
> > >> > doesn't really solve all issues with misbehaving clients since new
> > >> > connections can be created to bypass quotas.
> > >>
> > >> Our current quota only protects cooperating clients because our quota
> is
> > >> really throttling the NEXT request after process a request even if
> this
> > >> request itself has already violated quota. The misbehaving client are
> > not
> > >> protected at all with the current quota mechanism. Like you
> mentioned, a
> > >> connection quota is required. We have been discussing about this at
> > >> LinkedIn for some time. Doing it right requires some major changes
> such
> > as
> > >> partially reading a request to identify the client id at network level
> > and
> > >> disconnect misbehaving clients.
> > >>
> > >> While handling misbehaving clients is important, we are not trying to
> > >> address that in this KIP. This KIP is trying to improve the
> > communication
> > >> with good clients. Muting the channel is more of a migration plan so
> > that
> > >> we don't have regression on the old innocent (but good) clients.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >>
> > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <j...@confluent.io> wrote:
> > >>
> > >>> Hi, Jiangjie,
> > >>>
> > >>> 3. If a client closes a socket connection, typically the server only
> > >>> finds
> > >>> this out by reading from the channel and getting a negative size
> during
> > >>> the
> > >>> read. So, if a channel is muted by the server, the server won't be
> able
> > >>> to
> > >>> detect the closing of the connection by the client after the socket
> is
> > >>> unmuted. That's probably what Rajini wants to convey.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Jun
> > >>>
> > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <becket....@gmail.com>
> > wrote:
> > >>>
> > >>> > Thanks Rajini.
> > >>> >
> > >>> > 1. Good point. We do need to bump up the protocol version so that
> the
> > >>> new
> > >>> > clients do not wait for another throttle time when they are talking
> > to
> > >>> old
> > >>> > brokers. I'll update the KIP.
> > >>> >
> > >>> > 2. That is true. But the client was not supposed to send request to
> > the
> > >>> > broker during that period anyways. So detecting the broker failure
> > >>> later
> > >>> > seems fine?
> > >>> >
> > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the
> current
> > >>> state?
> > >>> > Currently the broker will still mute the socket until it sends the
> > >>> response
> > >>> > back. If the clients disconnect while they are being throttled, the
> > >>> closed
> > >>> > socket will not be detected until the throttle time has passed.
> > >>> >
> > >>> > Jun also suggested to bound the time by metric.sample.window.ms in
> > the
> > >>> > ticket. I am not sure about the bound on throttle time. It seems a
> > >>> little
> > >>> > difficult to come up with a good bound. If the bound is too large,
> it
> > >>> does
> > >>> > not really help solve the various timeout issue we may face. If the
> > >>> bound
> > >>> > is too low, the quota is essentially not honored. We may
> potentially
> > >>> treat
> > >>> > different requests differently, but that seems too complicated and
> > >>> error
> > >>> > prone.
> > >>> >
> > >>> > IMO, the key improvement we want to make is to tell the clients how
> > >>> long
> > >>> > they will be throttled so the clients knows what happened so they
> can
> > >>> act
> > >>> > accordingly instead of waiting naively. Muting the socket on the
> > broker
> > >>> > side is just in case of non-cooperating clients. For the existing
> > >>> clients,
> > >>> > it seems this does not have much impact compare with what we have
> > now.
> > >>> >
> > >>> > Thanks,
> > >>> >
> > >>> > Jiangjie (Becket) Qin
> > >>> >
> > >>> >
> > >>> >
> > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
> > >>> rajinisiva...@gmail.com>
> > >>> > wrote:
> > >>> >
> > >>> > > Hi Becket,
> > >>> > >
> > >>> > > Thank you for the KIP. A few comments:
> > >>> > >
> > >>> > > 1.KIP says:  "*No public interface changes are needed. We only
> > >>> propose
> > >>> > > behavior change on the broker side.*"
> > >>> > >
> > >>> > > But from the proposed changes, it sounds like clients will be
> > >>> updated to
> > >>> > > wait for throttle-time before sending next response, and also not
> > >>> handle
> > >>> > > idle disconnections during that time. Doesn't that mean that
> > clients
> > >>> need
> > >>> > > to know that the broker has sent the response before throttling,
> > >>> > requiring
> > >>> > > protocol/version change?
> > >>> > >
> > >>> > >
> > >>> > > 2. At the moment, broker failures are detected by clients (and
> vice
> > >>> > versa)
> > >>> > > within connections.max.idle.ms. By removing this check for an
> > >>> unlimited
> > >>> > > throttle time, failure detection could be delayed.
> > >>> > >
> > >>> > >
> > >>> > > 3. KIP says  "*Since this subsequent request is not actually
> > handled
> > >>> > until
> > >>> > > the broker unmutes the channel, the client can hit
> > >>> request.timeout.ms
> > >>> > > <http://request.timeout.ms> and reconnect. However, this is no
> > worse
> > >>> > than
> > >>> > > the current state.*"
> > >>> > >
> > >>> > > I think this could be worse than the current state because broker
> > >>> doesn't
> > >>> > > detect and close the channel for an unlimited throttle time,
> while
> > >>> new
> > >>> > > connections will get accepted. As a result, lot of connections
> > could
> > >>> be
> > >>> > in
> > >>> > > CLOSE_WAIT state when throttle time is high.
> > >>> > >
> > >>> > >
> > >>> > > Perhaps it is better to combine this KIP with a bound on throttle
> > >>> time?
> > >>> > >
> > >>> > >
> > >>> > > Regards,
> > >>> > >
> > >>> > >
> > >>> > > Rajini
> > >>> > >
> > >>> > >
> > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <becket....@gmail.com
> >
> > >>> wrote:
> > >>> > >
> > >>> > > > Thanks for the comment, Jun,
> > >>> > > >
> > >>> > > > 1. Yes, you are right. This could also happen with the current
> > >>> quota
> > >>> > > > mechanism because we are essentially muting the socket during
> > >>> throttle
> > >>> > > > time. There might be two ways to solve this.
> > >>> > > > A) use another socket to send heartbeat.
> > >>> > > > B) let the GroupCoordinator know that the client will not
> > >>> heartbeat for
> > >>> > > > some time.
> > >>> > > > It seems the first solution is cleaner.
> > >>> > > >
> > >>> > > > 2. For consumer it seems returning an empty response is a
> better
> > >>> > option.
> > >>> > > In
> > >>> > > > the producer case, if there is a spike in traffic. The brokers
> > >>> will see
> > >>> > > > queued up requests, but that is hard to avoid unless we have
> > >>> connection
> > >>> > > > level quota, which is a bigger change and may be easier to
> > discuss
> > >>> it
> > >>> > in
> > >>> > > a
> > >>> > > > separate KIP.
> > >>> > > >
> > >>> > > > Thanks,
> > >>> > > >
> > >>> > > > Jiangjie (Becket) Qin
> > >>> > > >
> > >>> > > >
> > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <j...@confluent.io>
> > wrote:
> > >>> > > >
> > >>> > > > > Hi, Jiangjie,
> > >>> > > > >
> > >>> > > > > Thanks for bringing this up. A couple of quick thoughts.
> > >>> > > > >
> > >>> > > > > 1. If the throttle time is large, what can happen is that a
> > >>> consumer
> > >>> > > > won't
> > >>> > > > > be able to heart beat to the group coordinator frequent
> enough.
> > >>> In
> > >>> > that
> > >>> > > > > case, even with this KIP, it seems there could be frequent
> > >>> consumer
> > >>> > > group
> > >>> > > > > rebalances.
> > >>> > > > >
> > >>> > > > > 2. If we return a response immediately, for the consumer, do
> we
> > >>> > return
> > >>> > > > the
> > >>> > > > > requested data or an empty response? If we do the former, it
> > may
> > >>> not
> > >>> > > > > protect against the case when there are multiple consumer
> > >>> instances
> > >>> > > > > associated with the same user/clientid.
> > >>> > > > >
> > >>> > > > > Jun
> > >>> > > > >
> > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
> > becket....@gmail.com
> > >>> >
> > >>> > > wrote:
> > >>> > > > >
> > >>> > > > > > Hi,
> > >>> > > > > >
> > >>> > > > > > We would like to start the discussion on KIP-219.
> > >>> > > > > >
> > >>> > > > > > The KIP tries to improve quota throttling time
> communication
> > >>> > between
> > >>> > > > > > brokers and clients to avoid clients timeout in case of
> long
> > >>> > > throttling
> > >>> > > > > > time.
> > >>> > > > > >
> > >>> > > > > > The KIP link is following:
> > >>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>> > > > > 219+-+Improve+quota+
> > >>> > > > > > communication
> > >>> > > > > >
> > >>> > > > > > Comments are welcome.
> > >>> > > > > >
> > >>> > > > > > Thanks,
> > >>> > > > > >
> > >>> > > > > > Jiangjie (Becket) Qin
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>

Reply via email to