On Tue, Nov 12, 2019, at 10:27, Brian Byrne wrote:
> Hi Colin,
> 
> Thanks for the feedback. I'm going to leave out some specifics in my
> response, since I'll go back to the KIP, revise it, and then post an update
> on the original discussion thread. I'll make two primary changes, (1)
> remove reference to expiry not being calculated appropriately, since this
> was determined to be a bug and is therefore being fixed aside from the KIP,
> and (2) be more specific with the details surrounding the actual
> implementation.
> 
> > Maybe I missed it, but there seemed to be some details missing here.  When
> > do we start the metadata fetch?  For example, if topic metadata expires
> > every 5 minutes, perhaps we should wait 4 minutes, then starting fetching
> > the new metadata, which we would expect to arrive by the 5 minute
> > deadline.  Or perhaps we should start the fetch even earlier, around the
> > 2.5 minute mark.  In any case, there should be some discussion about what
> > the actual policy is.  Given that metadata.max.age.ms is configurable,
> > maybe that policy  needs to be expressed in terms of a percentage of the
> > refresh period rather than in terms of an absolute delay.
> >
> 
> Sure, I can add this to the KIP, although I didn't want to make it too
> specific other than "at a reasonable time," which could practically be 2
> seconds before the expiry if it's reliable, or could be half the max age
> duration since we expect the cost to be negligible (with appropriate
> improvements).
>

Thanks again for tackling this, Brian.  I think It's important to be specific 
about what we're proposing here.

Two seconds doesn't seem like a reasonable amount of time to leave for the 
metadata fetch.  Fetching halfway through the expiration period seems more 
reasonable.  It also doesn't require us to create a new configuration key, 
which is nice.

Another option is to just do the metadata fetch every metadata.max.age.ms, but 
not expire the topic until we can't fetch the metadata for 2 * 
metadata.max.age.ms.

We should be specific about what happens if the first few metadata fetches 
fail.  Do we use exponential backoff to decide when to resend?  It seems like 
we really should, for all the usual reasons (reduce the load on brokers, ride 
out temporary service disruptions, etc.)  Maybe we could have an exponential 
retry backoff for each broker (in other words, we should try to contact a 
different broker before applying the backoff.)  I think this already sort of 
happens with the disconnect timeout, but we might need a more general solution.

> 
> > The KIP correctly points out that the current metadata fetching policy
> > causes us to "[block] in a function that's advertised as asynchronous."
> > However, the KIP doesn't seem to spell out whether we will continue to
> > block if metadata can't be found, or if this will be abolished.  Clearly,
> > starting the metadata fetch early will reduce blocking in the common case,
> > but will there still be blocking in the uncommon case where the early fetch
> > doesn't succeed in time?
> >
> 
> The goal is to be fully asynchronous for metadata resolution. Currently,
> the producer will block for a limited amount of time 'max.block.ms', which
> defaults to 60 seconds. If the metadata cannot be fetched during that
> period, a TimeoutException is thrown, however the proposed change would
> return the KafkaProducer::send()'s resulting Future<> immediately, and that
> timeout would eventually be passed via the future. If the metadata fetch is
> slow, then that record will be enqueued for processing once the metadata is
> resolved in a manner that's invisible to the client.

Thanks for the clarification.  Fully asynchronous is the way to go, I agree.  
I'm having trouble understanding how timeouts are handled in the KIP.  It seems 
like if we can't fetch the metadata within the designated metadata timeout, the 
future / callback should receive a TimeoutException right?  We do not want the 
send call to be deferred forever if metadata can't be fetched.  Eventually it 
should fail if it can't be performed.

I do think this is something that will have to be mentioned in the 
compatibility section.  There is some code out there that is probably prepared 
to handle a timeout exception from the send function, which may need to be 
updated to check for a timeout from the future or callback.

> 
> There will still be blocking in the event of memory pressure, but there's
> no good way around that (other than to fail the request immediately). The
> KIP should elaborate on how to deal with memory pressure, since there's a
> higher likelihood of filling the buffers while metadata fetching is
> outstanding, which invites starvation. There's ways to handle this, I will
> expand on this in the KIP.

It seems like this is an existing problem.  You may fire off a lot of send 
calls that get blocked because the broker that is the leader for a certain 
partition is not responding.  I'm not sure that we need to do anything special 
here.  On the other hand, we could make the case for a generic "max number of 
outstanding sends" configuration to prevent surprise OOMs in the existing 
cases, plus the new one we're adding.  But this feels like a bit of a scope 
expansion.

> 
> 
> > Can you clarify this part a bit?  It seems like we have a metadata
> > expiration policy now for topics, and we will have one after this KIP, but
> > they will be somewhat different.  But it's not clear to me what the
> > differences are.
> >
> 
> What the KIP is proposing is to expire topic metadata only if it hasn't
> been accessed within the max age threshold. Currently, the '
> metadata.max.age.ms' documents how frequently to refresh potentially-stale
> topic metadata, but expiry (that is, evict from the Producer's metadata
> cache entirely) is not connected to this value - it's the hard-coded 5
> minutes. While these aren't exactly the same concepts, it was suggested
> from the discussion thread to simplify things and make these values
> connected.

They may be connected, but I'm not sure they should be the same.  Perhaps 
expiry should be 4x the typical fetch rate, for example.

> 
> In general, if load is a problem, we should probably consider adding some
> > kind of jitter on the client side.  There are definitely cases where people
> > start up a lot of clients at the same time in parallel and there is a
> > thundering herd problem with metadata updates.  Adding jitter would spread
> > the load across time rather than creating a spike every 5 minutes in this
> > case.
> >
> 
> There's actually a deeper issue that jitter wouldn't fix (yet) - it's that
> metadata for *all* active topics is fetched on every metadata request,
> which is an N-squared problem today (first encountered topic T1 fetches
> T1's metadata, second topic T2 fetches T1+T2's metadata, and so on). So if
> the number of topics is large, these can be a heavy RPCs - so much so that,
> in degenerate cases, it was reported that clients were hitting their
> configured timeout while awaiting the metadata fetch results. So tracking
> when metadata was last refreshed isn't necessarily about the 5-minute
> refresh (although in steady state, it could be), but rather how much
> metadata to fetch when such a request is performed.
> 
> Once that's in place, jitter or some probabilistic fetching would help
> spread the load.

Hmm.... are you sure this is an N^2 problem?  If you have T1 and T2, you fetch 
metadata for T1 and T2, right?  I guess you could argue that we often fetch 
metadata for partitions we don't care about, but that doesn't make it O(N^2).  
Maybe there's something about the implementation that I'm missing.

In general, we need to take advantage of batching to do this well (one reason 
why I think we should steer clear of ultra-granular timeout tracking).  It's 
best to do 1 RPC asking for 10 topics, not 10 RPCs asking for a single topic 
each, even if that means some of the topic timeouts are not *exactly* aligned 
with the configured value.

best,
Colin


> 
> Thanks,
> Brian
> 
> On Mon, Nov 11, 2019 at 11:47 AM Colin McCabe <cmcc...@apache.org> wrote:
> 
> > Hi Brian,
> >
> > Thanks for the KIP.
> >
> > Starting the metadata fetch before we need the result is definitely a
> > great idea.  This way, the metadata fetch can be done in parallel with all
> > the other stuff e producer is doing, rather than forcing the producer to
> > periodically come to a halt periodically while metadata is fetched.
> >
> > Maybe I missed it, but there seemed to be some details missing here.  When
> > do we start the metadata fetch?  For example, if topic metadata expires
> > every 5 minutes, perhaps we should wait 4 minutes, then starting fetching
> > the new metadata, which we would expect to arrive by the 5 minute
> > deadline.  Or perhaps we should start the fetch even earlier, around the
> > 2.5 minute mark.  In any case, there should be some discussion about what
> > the actual policy is.  Given that metadata.max.age.ms is configurable,
> > maybe that policy  needs to be expressed in terms of a percentage of the
> > refresh period rather than in terms of an absolute delay.
> >
> > The KIP correctly points out that the current metadata fetching policy
> > causes us to "[block] in a function that's advertised as asynchronous."
> > However, the KIP doesn't seem to spell out whether we will continue to
> > block if metadata can't be found, or if this will be abolished.  Clearly,
> > starting the metadata fetch early will reduce blocking in the common case,
> > but will there still be blocking in the uncommon case where the early fetch
> > doesn't succeed in time?
> >
> >  > To address (2), the producer currently maintains an expiry threshold
> > for
> >  > every topic, which is used to remove a topic from the working set at a
> >  > future time (currently hard-coded to 5 minutes, this should be modified
> > to
> >  > use metadata.max.age.ms). While this does work to reduce the size of
> > the
> >  > topic working set, the producer will continue fetching metadata for
> > these
> >  > topics in every metadata request for the full expiry duration. This
> > logic
> >  > can be made more intelligent by managing the expiry from when the topic
> >  > was last used, enabling the expiry duration to be reduced to improve
> > cases
> >  > where a large number of topics are touched intermittently.
> >
> > Can you clarify this part a bit?  It seems like we have a metadata
> > expiration policy now for topics, and we will have one after this KIP, but
> > they will be somewhat different.  But it's not clear to me what the
> > differences are.
> >
> > In general, if load is a problem, we should probably consider adding some
> > kind of jitter on the client side.  There are definitely cases where people
> > start up a lot of clients at the same time in parallel and there is a
> > thundering herd problem with metadata updates.  Adding jitter would spread
> > the load across time rather than creating a spike every 5 minutes in this
> > case.
> >
> > best,
> > Colin
> >
> >
> > On Fri, Nov 8, 2019, at 08:59, Ismael Juma wrote:
> > > I think this KIP affects when we block which is actually user visible
> > > behavior. Right?
> > >
> > > Ismael
> > >
> > > On Fri, Nov 8, 2019, 8:50 AM Brian Byrne <bby...@confluent.io> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Regarding metadata expiry, no access times other than the initial
> > lookup[1]
> > > > are used for determining when to expire producer metadata. Therefore,
> > > > frequently used topics' metadata will be aged out and subsequently
> > > > refreshed (in a blocking manner) every five minutes, and infrequently
> > used
> > > > topics will be retained for a minimum of five minutes and currently
> > > > refetched on every metadata update during that time period. The
> > sentence is
> > > > suggesting that we could reduce the expiry time to improve the latter
> > > > without affecting (rather slightly improving) the former.
> > > >
> > > > Keep in mind that in most all cases, I wouldn't anticipate much of a
> > > > difference with producer behavior, and the extra logic can be
> > implemented
> > > > to have insignificant cost. It's the large/dynamic topic corner cases
> > that
> > > > we're trying to improve.
> > > >
> > > > It'd be convenient if the KIP is no longer necessary. You're right in
> > that
> > > > there's no public API changes and the behavioral changes are entirely
> > > > internal. I'd be happy to continue the discussion around the KIP, but
> > > > unless otherwise objected, it can be retired.
> > > >
> > > > [1] Not entirely accurate, it's actually the first time when the client
> > > > calculates whether to retain the topic in its metadata.
> > > >
> > > > Thanks,
> > > > Brian
> > > >
> > > > On Thu, Nov 7, 2019 at 4:48 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > >
> > > > > Hello Brian,
> > > > >
> > > > > Could you elaborate a bit more on this sentence: "This logic can be
> > made
> > > > > more intelligent by managing the expiry from when the topic was last
> > > > used,
> > > > > enabling the expiry duration to be reduced to improve cases where a
> > large
> > > > > number of topics are touched intermittently." Not sure I fully
> > understand
> > > > > the proposal.
> > > > >
> > > > > Also since now this KIP did not make any public API changes and the
> > > > > behavioral changes are not considered a public API contract (i.e.
> > how we
> > > > > maintain the topic metadata in producer cache is never committed to
> > > > users),
> > > > > I wonder if we still need a KIP for the proposed change any more?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Thu, Nov 7, 2019 at 12:43 PM Brian Byrne <bby...@confluent.io>
> > wrote:
> > > > >
> > > > > > Hello all,
> > > > > >
> > > > > > I'd like to propose a vote for a producer change to improve
> > producer
> > > > > > behavior when dealing with a large number of topics, in part by
> > > > reducing
> > > > > > the amount of metadata fetching performed.
> > > > > >
> > > > > > The full KIP is provided here:
> > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> > > > > >
> > > > > > And the discussion thread:
> > > > > >
> > > > > >
> > > > >
> > > >
> > https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> > > > > >
> > > > > > Thanks,
> > > > > > Brian
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Reply via email to