>
> For new (uncached) topics, one problem here is that we don't know which
> partition to map a record to in the event that it has a key or custom
> partitioner, so the RecordAccumulator wouldn't know which batch/broker it
> belongs. We'd need an intermediate record queue that subsequently moved the
> records into RecordAccumulators once metadata resolution was complete. For
> known topics, we don't currently block at all in waitOnMetadata.
>

You are right, I forget this fact, and the intermediate record queue will
help, but I have some questions

if we add an intermediate record queue in KafkaProducer, when should we
move the records into RecordAccumulators?
only NetworkClient is aware of the MetadataResponse, here is the
hierarchical structure of the related classes:
KafkaProducer
    Accumulator
    Sender
        NetworkClient
            metadataUpdater.handleCompletedMetadataResponse

so
1. we should also add a metadataUpdater to KafkaProducer?
2. if the topic really does not exists? the intermediate record queue will
become too large?
3. and should we `block` when the intermediate record queue is too large?
and this will again bring the blocking problem?



On Wed, Nov 20, 2019 at 12:40 AM Brian Byrne <bby...@confluent.io> wrote:

> Hi Deng,
>
> Thanks for the feedback.
>
> On Mon, Nov 18, 2019 at 6:56 PM deng ziming <dengziming1...@gmail.com>
> wrote:
>
> > hi, I reviewed the current code, the ProduceMetadata maintains an expiry
> > threshold for every topic, every time when we write to a topic we will
> set
> > the expiry time to -1 to indicate it should be updated, this does work to
> > reduce the size of the topic working set, but the producer will continue
> > fetching metadata for these topics in every metadata request for the full
> > expiry duration.
> >
>
> Indeed, you are correct, I terribly misread the code here. Fortunately this
> was only a minor optimization in the KIP that's no longer necessary.
>
>
> and we can improve the situation by 2 means:
> >     1. we maintain a refresh threshold for every topic which is for
> example
> > 0.8 * expiry_threshold, and when we send `MetadataRequest` to brokers we
> > just request unknownLeaderTopics + unknownPartitionTopics + topics
> > reach refresh threshold.
> >
>
> Right, this is similar to what I suggested, with a larger window on the
> "staleness" that permits for batching to an appropriate size (except if
> there's any unknown topics, you'd want to issue the request immediately).
>
>
>
> >     2. we don't invoke KafkaProducer#waitOnMetadata when we call
> > KafkaProducer#send because of we just send data to RecordAccumulator, and
> > before we send data to brokers we will invoke RecordAccumulator#ready(),
> so
> > we can only invoke waitOnMetadata to block when (number topics
> > reach refresh threshold)>(number of all known topics)*0.2.
> >
>
> For new (uncached) topics, one problem here is that we don't know which
> partition to map a record to in the event that it has a key or custom
> partitioner, so the RecordAccumulator wouldn't know which batch/broker it
> belongs. We'd need an intermediate record queue that subsequently moved the
> records into RecordAccumulators once metadata resolution was complete. For
> known topics, we don't currently block at all in waitOnMetadata.
>
> The last major point of minimizing producer startup metadata RPCs may still
> need to be improved, but this would be a large improvement on the current
> situation.
>
> Thanks,
> Brian
>
>
>
> > I think the above 2 ways are enough to solve the current problem.
> >
> > On Tue, Nov 19, 2019 at 3:20 AM Colin McCabe <cmcc...@apache.org> wrote:
> >
> > > On Mon, Nov 18, 2019, at 10:05, Brian Byrne wrote:
> > > > On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe <cmcc...@apache.org>
> > wrote:
> > > >
> > > > > Two seconds doesn't seem like a reasonable amount of time to leave
> > for
> > > the
> > > > > metadata fetch.  Fetching halfway through the expiration period
> seems
> > > more
> > > > > reasonable.  It also doesn't require us to create a new
> configuration
> > > key,
> > > > > which is nice.
> > > > >
> > > > > Another option is to just do the metadata fetch every
> > > metadata.max.age.ms,
> > > > > but not expire the topic until we can't fetch the metadata for 2 *
> > > > > metadata.max.age.ms.
> > > > >
> > > >
> > > > I'd expect two seconds to be reasonable in the common case. Keep in
> > mind
> > > > that this doesn't affect correctness, and a control operation
> returning
> > > > cached metadata should be on the order of milliseconds.
> > > >
> > >
> > > Hi Brian,
> > >
> > > Thanks again for the KIP.
> > >
> > > I think the issue here is not the common case, but the uncommon case
> > where
> > > the metadata fetch takes longer than expected.  In that case, we don't
> > want
> > > to be in the position of having our metadata expire because we waited
> too
> > > long to renew it.
> > >
> > > This is one reason why I think that the metadata expiration time should
> > be
> > > longer than the metadata refresh time.  In fact, it might be worth
> having
> > > two separate configuration keys for these two values.  I could imagine
> a
> > > user who is having trouble with metadata expiration wanting to increase
> > the
> > > metadata expiration time, but without increasing the metadata refresh
> > > period.  In a sense, the metadata expiration time is like the ZK
> session
> > > expiration time.  You might want to turn it up if the cluster is
> > > experiencing load spikes.
> > >
> > > >
> > > > But to the general
> > > > point, defining the algorithm would mean enforcing it to fair
> accuracy,
> > > > whereas if the suggestion is that it'll be performed at a reasonable
> > > time,
> > > > it allows for batching and other optimizations. Perhaps I shouldn't
> be
> > > > regarding what's defined in a KIP to be contractual in these cases,
> but
> > > you
> > > > could consider a first implementation to collect topics whose
> metadata
> > > has
> > > > exceeded (metadata.max.age.ms / 2), and sending the batch once a
> > > > constituent topic's metadata is near the expiry, or a sufficient
> number
> > > of
> > > > topics have been collected (10? 100? 1000?).
> > > >
> > >
> > > I'm concerned that if we change the metadata caching strategy without
> > > discussing it first, it may improve certain workloads but make others
> > > worse.  We need to be concrete about what the proposed strategy is so
> > that
> > > we can really evaluate it.
> > >
> > > >
> > > >
> > > > > We should be specific about what happens if the first few metadata
> > > fetches
> > > > > fail.  Do we use exponential backoff to decide when to resend?  It
> > > seems
> > > > > like we really should, for all the usual reasons (reduce the load
> on
> > > > > brokers, ride out temporary service disruptions, etc.)  Maybe we
> > could
> > > have
> > > > > an exponential retry backoff for each broker (in other words, we
> > > should try
> > > > > to contact a different broker before applying the backoff.)  I
> think
> > > this
> > > > > already sort of happens with the disconnect timeout, but we might
> > need
> > > a
> > > > > more general solution.
> > > > >
> > > >
> > > > I don't plan to change this behavior. Currently it retries after a
> > fixed
> > > > value of 'retry.backoff.ms' (defaults to 100 ms). It's possible that
> > > > different brokers are attempted, but I haven't dug into it.
> > > >
> > >
> > > I think it's critical to understand what the current behavior is before
> > we
> > > try to change it.  The difference between retrying the same broker and
> > > trying a different one has a large impact it has on cluster load and
> > > latency.  For what it's worth, I believe the behavior is the second
> one,
> > > but it has been a while since I checked.  Let's figure this out.
> > >
> > > >
> > > > > Thanks for the clarification.  Fully asynchronous is the way to
> go, I
> > > > > agree.  I'm having trouble understanding how timeouts are handled
> in
> > > the
> > > > > KIP.  It seems like if we can't fetch the metadata within the
> > > designated
> > > > > metadata timeout, the future / callback should receive a
> > > TimeoutException
> > > > > right?  We do not want the send call to be deferred forever if
> > metadata
> > > > > can't be fetched.  Eventually it should fail if it can't be
> > performed.
> > > > >
> > > > > I do think this is something that will have to be mentioned in the
> > > > > compatibility section.  There is some code out there that is
> probably
> > > > > prepared to handle a timeout exception from the send function,
> which
> > > may
> > > > > need to be updated to check for a timeout from the future or
> > callback.
> > > > >
> > > >
> > > > Correct, a timeout exception would be delivered in the future. Sure,
> I
> > > can
> > > > add that note to the KIP.
> > > >
> > >
> > > Thanks.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > >
> > > > > It seems like this is an existing problem.  You may fire off a lot
> of
> > > send
> > > > > calls that get blocked because the broker that is the leader for a
> > > certain
> > > > > partition is not responding.  I'm not sure that we need to do
> > anything
> > > > > special here.  On the other hand, we could make the case for a
> > generic
> > > "max
> > > > > number of outstanding sends" configuration to prevent surprise OOMs
> > in
> > > the
> > > > > existing cases, plus the new one we're adding.  But this feels
> like a
> > > bit
> > > > > of a scope expansion.
> > > > >
> > > >
> > > > Right, this is an existing problem, however the asynchronous send
> could
> > > > cause unexpected behavior. For example, if a client pinned
> > > > topics/partitions to individual send threads, then memory couldn't be
> > > > exhausted by a single topic since a blocking send would prevent
> further
> > > > records from being buffered on that topic. The compromise could be
> that
> > > we
> > > > only ever permit one outstanding record batch for a topic, which
> would
> > > keep
> > > > the code simple and wouldn't permit a single topic to consume all
> > > available
> > > > memory.
> > > >
> > > >
> > > >
> > > > > They may be connected, but I'm not sure they should be the same.
> > > Perhaps
> > > > > expiry should be 4x the typical fetch rate, for example.
> > > > >
> > > >
> > > > That's true. You could also make the case for a faster expiry than
> > > refresh
> > > > as well. Makes sense to separate this out.
> > > >
> > > >
> > > >
> > > > > Hmm.... are you sure this is an N^2 problem?  If you have T1 and
> T2,
> > > you
> > > > > fetch metadata for T1 and T2, right?  I guess you could argue that
> we
> > > often
> > > > > fetch metadata for partitions we don't care about, but that doesn't
> > > make it
> > > > > O(N^2).  Maybe there's something about the implementation that I'm
> > > missing.
> > > > >
> > > >
> > > > My apologies, I left out the context. One issue the KIP is trying to
> > > > resolve is the metadata storm that's caused by producers starting up.
> > In
> > > > the worst case, where the send call is only performed from a single
> > > thread
> > > > (i.e. no possible batching), fetching metadata for 1K topics will
> > > generate
> > > > 1K RPCs, with payload 1+2+...+1K topics' metadata. Being smart about
> > the
> > > > topics being refreshed would still generate 1K RPCs for 1 topic each,
> > and
> > > > asynchronous behavior would permit batching (note steady-state
> > refreshing
> > > > doesn't require the asynchronous behavior to batch).
> > > >
> > > >
> > > >
> > > > > In general, we need to take advantage of batching to do this well
> > (one
> > > > > reason why I think we should steer clear of ultra-granular timeout
> > > > > tracking).  It's best to do 1 RPC asking for 10 topics, not 10 RPCs
> > > asking
> > > > > for a single topic each, even if that means some of the topic
> > timeouts
> > > are
> > > > > not *exactly* aligned with the configured value.
> > > > >
> > > >
> > > > Absolutely, agreed.
> > > >
> > > > Thanks,
> > > > Brian
> > > >
> > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Brian
> > > > > >
> > > > > > On Mon, Nov 11, 2019 at 11:47 AM Colin McCabe <
> cmcc...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Brian,
> > > > > > >
> > > > > > > Thanks for the KIP.
> > > > > > >
> > > > > > > Starting the metadata fetch before we need the result is
> > > definitely a
> > > > > > > great idea.  This way, the metadata fetch can be done in
> parallel
> > > with
> > > > > all
> > > > > > > the other stuff e producer is doing, rather than forcing the
> > > producer
> > > > > to
> > > > > > > periodically come to a halt periodically while metadata is
> > fetched.
> > > > > > >
> > > > > > > Maybe I missed it, but there seemed to be some details missing
> > > here.
> > > > > When
> > > > > > > do we start the metadata fetch?  For example, if topic metadata
> > > expires
> > > > > > > every 5 minutes, perhaps we should wait 4 minutes, then
> starting
> > > > > fetching
> > > > > > > the new metadata, which we would expect to arrive by the 5
> minute
> > > > > > > deadline.  Or perhaps we should start the fetch even earlier,
> > > around
> > > > > the
> > > > > > > 2.5 minute mark.  In any case, there should be some discussion
> > > about
> > > > > what
> > > > > > > the actual policy is.  Given that metadata.max.age.ms is
> > > configurable,
> > > > > > > maybe that policy  needs to be expressed in terms of a
> percentage
> > > of
> > > > > the
> > > > > > > refresh period rather than in terms of an absolute delay.
> > > > > > >
> > > > > > > The KIP correctly points out that the current metadata fetching
> > > policy
> > > > > > > causes us to "[block] in a function that's advertised as
> > > asynchronous."
> > > > > > > However, the KIP doesn't seem to spell out whether we will
> > > continue to
> > > > > > > block if metadata can't be found, or if this will be abolished.
> > > > > Clearly,
> > > > > > > starting the metadata fetch early will reduce blocking in the
> > > common
> > > > > case,
> > > > > > > but will there still be blocking in the uncommon case where the
> > > early
> > > > > fetch
> > > > > > > doesn't succeed in time?
> > > > > > >
> > > > > > >  > To address (2), the producer currently maintains an expiry
> > > threshold
> > > > > > > for
> > > > > > >  > every topic, which is used to remove a topic from the
> working
> > > set
> > > > > at a
> > > > > > >  > future time (currently hard-coded to 5 minutes, this should
> be
> > > > > modified
> > > > > > > to
> > > > > > >  > use metadata.max.age.ms). While this does work to reduce
> the
> > > size
> > > > > of
> > > > > > > the
> > > > > > >  > topic working set, the producer will continue fetching
> > metadata
> > > for
> > > > > > > these
> > > > > > >  > topics in every metadata request for the full expiry
> duration.
> > > This
> > > > > > > logic
> > > > > > >  > can be made more intelligent by managing the expiry from
> when
> > > the
> > > > > topic
> > > > > > >  > was last used, enabling the expiry duration to be reduced to
> > > improve
> > > > > > > cases
> > > > > > >  > where a large number of topics are touched intermittently.
> > > > > > >
> > > > > > > Can you clarify this part a bit?  It seems like we have a
> > metadata
> > > > > > > expiration policy now for topics, and we will have one after
> this
> > > KIP,
> > > > > but
> > > > > > > they will be somewhat different.  But it's not clear to me what
> > the
> > > > > > > differences are.
> > > > > > >
> > > > > > > In general, if load is a problem, we should probably consider
> > > adding
> > > > > some
> > > > > > > kind of jitter on the client side.  There are definitely cases
> > > where
> > > > > people
> > > > > > > start up a lot of clients at the same time in parallel and
> there
> > > is a
> > > > > > > thundering herd problem with metadata updates.  Adding jitter
> > would
> > > > > spread
> > > > > > > the load across time rather than creating a spike every 5
> minutes
> > > in
> > > > > this
> > > > > > > case.
> > > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Nov 8, 2019, at 08:59, Ismael Juma wrote:
> > > > > > > > I think this KIP affects when we block which is actually user
> > > visible
> > > > > > > > behavior. Right?
> > > > > > > >
> > > > > > > > Ismael
> > > > > > > >
> > > > > > > > On Fri, Nov 8, 2019, 8:50 AM Brian Byrne <
> bby...@confluent.io>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Guozhang,
> > > > > > > > >
> > > > > > > > > Regarding metadata expiry, no access times other than the
> > > initial
> > > > > > > lookup[1]
> > > > > > > > > are used for determining when to expire producer metadata.
> > > > > Therefore,
> > > > > > > > > frequently used topics' metadata will be aged out and
> > > subsequently
> > > > > > > > > refreshed (in a blocking manner) every five minutes, and
> > > > > infrequently
> > > > > > > used
> > > > > > > > > topics will be retained for a minimum of five minutes and
> > > currently
> > > > > > > > > refetched on every metadata update during that time period.
> > The
> > > > > > > sentence is
> > > > > > > > > suggesting that we could reduce the expiry time to improve
> > the
> > > > > latter
> > > > > > > > > without affecting (rather slightly improving) the former.
> > > > > > > > >
> > > > > > > > > Keep in mind that in most all cases, I wouldn't anticipate
> > > much of
> > > > > a
> > > > > > > > > difference with producer behavior, and the extra logic can
> be
> > > > > > > implemented
> > > > > > > > > to have insignificant cost. It's the large/dynamic topic
> > corner
> > > > > cases
> > > > > > > that
> > > > > > > > > we're trying to improve.
> > > > > > > > >
> > > > > > > > > It'd be convenient if the KIP is no longer necessary.
> You're
> > > right
> > > > > in
> > > > > > > that
> > > > > > > > > there's no public API changes and the behavioral changes
> are
> > > > > entirely
> > > > > > > > > internal. I'd be happy to continue the discussion around
> the
> > > KIP,
> > > > > but
> > > > > > > > > unless otherwise objected, it can be retired.
> > > > > > > > >
> > > > > > > > > [1] Not entirely accurate, it's actually the first time
> when
> > > the
> > > > > client
> > > > > > > > > calculates whether to retain the topic in its metadata.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Brian
> > > > > > > > >
> > > > > > > > > On Thu, Nov 7, 2019 at 4:48 PM Guozhang Wang <
> > > wangg...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hello Brian,
> > > > > > > > > >
> > > > > > > > > > Could you elaborate a bit more on this sentence: "This
> > logic
> > > can
> > > > > be
> > > > > > > made
> > > > > > > > > > more intelligent by managing the expiry from when the
> topic
> > > was
> > > > > last
> > > > > > > > > used,
> > > > > > > > > > enabling the expiry duration to be reduced to improve
> cases
> > > > > where a
> > > > > > > large
> > > > > > > > > > number of topics are touched intermittently." Not sure I
> > > fully
> > > > > > > understand
> > > > > > > > > > the proposal.
> > > > > > > > > >
> > > > > > > > > > Also since now this KIP did not make any public API
> changes
> > > and
> > > > > the
> > > > > > > > > > behavioral changes are not considered a public API
> contract
> > > (i.e.
> > > > > > > how we
> > > > > > > > > > maintain the topic metadata in producer cache is never
> > > committed
> > > > > to
> > > > > > > > > users),
> > > > > > > > > > I wonder if we still need a KIP for the proposed change
> any
> > > more?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > > On Thu, Nov 7, 2019 at 12:43 PM Brian Byrne <
> > > bby...@confluent.io
> > > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hello all,
> > > > > > > > > > >
> > > > > > > > > > > I'd like to propose a vote for a producer change to
> > improve
> > > > > > > producer
> > > > > > > > > > > behavior when dealing with a large number of topics, in
> > > part by
> > > > > > > > > reducing
> > > > > > > > > > > the amount of metadata fetching performed.
> > > > > > > > > > >
> > > > > > > > > > > The full KIP is provided here:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> > > > > > > > > > >
> > > > > > > > > > > And the discussion thread:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> >
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Brian
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to