Thanks for reaching out to the community for this. I think (maybe you've
also suggested) it is rather an observation on producer client than on
streams client. Generally speaking we want to know if we can fail fast if
the metadata cannot be found in producer.send() call. And here are my

1) caching the metadata outside producer, e.g. in an admin client would not
be a perfect solution since in either way your metadata cache inside the
producer or inside the admin client would not guarantee to be always up to
date: e.g. maybe you've decided to fail the record to send since it was not
in the cache, but one second right after it the metadata gets refreshed and
contains that topic.

2) letting the send() call to fail with an UnknownTopicOrPartitionError and
push the burden on the caller to decide what to do (either wait and retry,
or give up and stop the world etc) may work, but that requires modifying
the interface semantics, or at least adding an overloaded function of
"send()". Maybe worth discussing in a KIP.

3) for your specific case, if you believe the metadata should be static and
not changed (i.e. you assume all topics should be pre-created and none
would be created later), then I think setting max.block to a smaller value
and just catch TimeoutException is fine since for send() itself, the
max.block is only used for metadata refresh and buffer allocation when it
is not sufficient, and the latter should be rare case assuming you set the
buffer.size to be reasonably large. But note that since max.block is a
global config it may also affect other blocking calls like txn-related ones
as well.

On Wed, Jul 1, 2020 at 6:10 PM Rhys Anthony McCaig <mcc...@gmail.com> wrote:

> Hi All,
> I have been recently working on a streams application that uses a
> TopicNameExtractor to dynamically route records based on the payload. This
> streams application is used by various other applications, and occasionally
> these other applications request for a record to be sent to a non-existent
> topic - rather than this topic be created, the message should be logged and
> dropped.
> Unfortunately, I don't seem to have found a good way to implement this
> behaviour in a reliable way: I originally hoped to be able to catch these
> scenarios in a ProductionExceptionHandler by catching an
> UnknownTopicOrPartitionError, however the current producer behaviour is to
> wait for max.block.ms in waitOnMetadata() for partitions to be returned
> for
> the topic before throwing a TimeoutException. If after refreshing metadata,
> there are still no partitions for the requested topic, it will continue to
> request an update until the timeout is reached:  (
> https://github.com/apache/kafka/blob/b8a99be7847c61d7792689b71fda5b283f8340a8/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1051
> )
> For my use case, there are two challenges here:
> 1. ProductionExceptionHandler must catch TimeoutException and inspect the
> message to determine that the exception was caused by not finding the topic
> in the metadata
> 2. The streams task blocks (as expected) while the producer is fetching
> metadata, holding up processing of other records, until the timeout
> exception is thrown.
> Rather than accept the stream blocking in this scenario, my current
> thinking is to use AdminClient to keep a cache of existing/nonexisting
> topics periodically updated and filter based on this - however i can't stop
> thinking that this feels clunky, given the producer maintains its own cache
> of recently accessed topics/partitions.
> Would it make sense to enhance KafkaProducer to:
> - Optionally fail fast when the first metadata refresh does not return the
> requested topic, or partition count? (And maybe even optionally cache
> this?)
> - Differentiate between a TimeoutException and
> UnknownTopicOrPartitionError?
> My understanding of the internals isn't great - I'm not clear on the reason
> to continue to request metadata updates after getting a new version - is
> there a possible issue with getting stale metadata from brokers?
> Looking forward to your thoughts!

-- Guozhang

Reply via email to