Sorry, I meant `CompletionStage` (instead of CompletableFuture) as this is
the interface.

Best,
———
Josep Prat

Aiven Deutschland GmbH

Immanuelkirchstraße 26, 10405 Berlin

Amtsgericht Charlottenburg, HRB 209739 B

Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen

m: +491715557497

w: aiven.io

e: josep.p...@aiven.io

On Wed, May 26, 2021, 15:36 Josep Prat <josep.p...@aiven.io> wrote:

> Hi,
> If I may, I would like to suggest that instead of using Java's `Future`
> class on the API's, it would be better to use `CompletableFuture`. This
> would offer the possibility of applying callbacks on its completion for
> example.
>
> Best,
>
> On Wed, May 26, 2021 at 3:28 PM Matthew de Detrich
> <matthew.dedetr...@aiven.io.invalid> wrote:
>
>> Maybe there was a miscommunication but I agree with everything you said, I
>> was just clarifying what my definition of blocking is (because I think
>> there was a misunderstanding).
>>
>> And yes you are right, there is a limited amount of threads which is why
>> blocking is a bad thing because having threads sitting around waiting/not
>> doing anything is a waste of resources but ultimately this is also a
>> performance problem because if you don't block you can simply process more
>> IO tasks on a given machine/instance (hence greater performance).
>>
>> In any case, as is clarified the current behavior of send() needs to be
>> fixed. It's returning a Future but since it's internally blocking and
>> using
>> the caller's thread from an API perspective it gives the incorrect
>> impression that it's asynchronous (when it's not).
>>
>> On Wed, May 26, 2021 at 3:15 PM Ryanne Dolan <ryannedo...@gmail.com>
>> wrote:
>>
>> > Matthew, it's more than performance tho. In many frameworks the number
>> of
>> > request threads is purposefully constrained, and blocking one means you
>> > have one less to handle requests with. When you're handling a large
>> amount
>> > of requests with a small number of threads, any blocking can lead to
>> thread
>> > exhaustion.
>> >
>> > For this reason, you'll often see send() wrapped in a future or thread
>> > pool. But it's surprising that this would be required, since send()
>> already
>> > returns a future.
>> >
>> > Additionally, even when send() does not actually block, it does a lot of
>> > work on the caller's thread, which is likewise surprising given a
>> future is
>> > returned. The effect is the same: less threads are available to handle
>> > requests, and you risk thread exhaustion.
>> >
>> > I think we may incidentally improve performance if we introduce an
>> internal
>> > thread pool, but the primary motivation here, at least for me, is to fix
>> > the lie the API is telling, not to improve performance.
>> >
>> > Ryanne
>> >
>> >
>> >
>> > On Wed, May 26, 2021, 6:51 AM Matthew de Detrich
>> > <matthew.dedetr...@aiven.io.invalid> wrote:
>> >
>> > > I think we may need to clarify terminology here, at least to me
>> blocking
>> > > means suspending a current thread to wait for some operation (which is
>> > > wasteful if we are dealing with IO bound tasks). In other words, the
>> > > "blocking" is an implementation detail on how to wait rather than
>> whether
>> > > we need to wait or not, so to me this is more of a performance
>> question.
>> > >
>> > > In the scenario you describe of kafka clients producing too many
>> > messages,
>> > > as you said buffering is what should be done but I wouldn't classify
>> this
>> > > as blocking.
>> > >
>> > > On Mon, May 24, 2021 at 7:54 PM Colin McCabe <cmcc...@apache.org>
>> wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > I agree that we should give users the option of having a fully async
>> > API,
>> > > > but I don't think external thread pools or queues are the right
>> > direction
>> > > > to go here. They add performance overheads and don't address the
>> root
>> > > > causes of the problem.
>> > > >
>> > > > There are basically two scenarios where we block, currently. One is
>> > when
>> > > > we are doing a metadata fetch. I think this is clearly a bug, or at
>> > least
>> > > > an implementation limitation. From the user's point of view, the
>> fact
>> > > that
>> > > > we are doing a metadata fetch is an implementation detail that
>> really
>> > > > shouldn't be exposed like this. We have talked about fixing this in
>> the
>> > > > past. I think we just should spend the time to do it.
>> > > >
>> > > > The second scenario is where the client has produced too much data
>> in
>> > too
>> > > > little time. This could happen if there is a network glitch, or the
>> > > server
>> > > > is slower than expected. In this case, the behavior is intentional
>> and
>> > > not
>> > > > a bug. To understand this, think about what would happen if we
>> didn't
>> > > > block. We would start buffering more and more data in memory, until
>> > > finally
>> > > > the application died with an out of memory error. That would be
>> > > frustrating
>> > > > for users and wouldn't add to the usability of Kafka.
>> > > >
>> > > > We could potentially have an option to handle the out-of-memory
>> > scenario
>> > > > differently by returning an error code immediately rather than
>> > blocking.
>> > > > Applications would have to be rewritten to handle this properly,
>> but it
>> > > is
>> > > > a possibility. I suspect that most of them wouldn't use this, but we
>> > > could
>> > > > offer it as a possibility for async purists (which might include
>> > certain
>> > > > frameworks). The big problem the users would have to solve is what
>> to
>> > do
>> > > > with the record that they were unable to produce due to the buffer
>> full
>> > > > issue.
>> > > >
>> > > > best,
>> > > > Colin
>> > > >
>> > > >
>> > > > On Thu, May 20, 2021, at 10:35, Nakamura wrote:
>> > > > > >
>> > > > > > My suggestion was just do this in multiple steps/phases, firstly
>> > > let's
>> > > > fix
>> > > > > > the issue of send being misleadingly asynchronous (i.e.
>> internally
>> > > its
>> > > > > > blocking) and then later one we can make the various
>> > > > > > threadpools configurable with a sane default.
>> > > > >
>> > > > > I like that approach. I updated the "Which thread should be
>> > responsible
>> > > > for
>> > > > > waiting" part of KIP-739 to add your suggestion as my recommended
>> > > > approach,
>> > > > > thank you!  If no one else has major concerns about that approach,
>> > I'll
>> > > > > move the alternatives to "rejected alternatives".
>> > > > >
>> > > > > On Thu, May 20, 2021 at 7:26 AM Matthew de Detrich
>> > > > > <matthew.dedetr...@aiven.io.invalid> wrote:
>> > > > >
>> > > > > > @
>> > > > > >
>> > > > > > Nakamura
>> > > > > > On Wed, May 19, 2021 at 7:35 PM Nakamura <nny...@gmail.com>
>> wrote:
>> > > > > >
>> > > > > > > @Ryanne:
>> > > > > > > In my mind's eye I slightly prefer the throwing the "cannot
>> > > enqueue"
>> > > > > > > exception to satisfying the future immediately with the
>> "cannot
>> > > > enqueue"
>> > > > > > > exception?  But I agree, it would be worth doing more
>> research.
>> > > > > > >
>> > > > > > > @Matthew:
>> > > > > > >
>> > > > > > > > 3. Using multiple thread pools is definitely recommended for
>> > > > different
>> > > > > > > > types of tasks, for serialization which is CPU bound you
>> > > definitely
>> > > > > > would
>> > > > > > > > want to use a bounded thread pool that is fixed by the
>> number
>> > of
>> > > > CPU's
>> > > > > > > (or
>> > > > > > > > something along those lines).
>> > > > > > > >
>> > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c
>> > > > is
>> > > > > > a
>> > > > > > > > very good guide on this topic
>> > > > > > > I think this guide is good in general, but I would be
>> hesitant to
>> > > > follow
>> > > > > > > its guidance re: offloading serialization without benchmarking
>> > it.
>> > > > My
>> > > > > > > understanding is that context-switches have gotten much
>> cheaper,
>> > > and
>> > > > that
>> > > > > > > gains from cache locality are small, but they're not nothing.
>> > > > Especially
>> > > > > > > if the workload has a very small serialization cost, I
>> wouldn't
>> > be
>> > > > > > shocked
>> > > > > > > if it made it slower.  I feel pretty strongly that we should
>> do
>> > > more
>> > > > > > > research here before unconditionally encouraging serialization
>> > in a
>> > > > > > > threadpool.  If people think it's important to do it here (eg
>> if
>> > we
>> > > > think
>> > > > > > > it would mean another big API change) then we should start
>> > thinking
>> > > > about
>> > > > > > > what benchmarking we can do to gain higher confidence in this
>> > kind
>> > > of
>> > > > > > > change.  However, I don't think it would change semantics as
>> > > > > > substantially
>> > > > > > > as we're proposing here, so I would vote for pushing this to a
>> > > > subsequent
>> > > > > > > KIP.
>> > > > > > >
>> > > > > > Of course, its all down to benchmarking, benchmarking and
>> > > benchmarking.
>> > > > > > Ideally speaking you want to use all of the resources available
>> to
>> > > > you, so
>> > > > > > if you have a bottleneck in serialization and you have many
>> cores
>> > > free
>> > > > then
>> > > > > > using multiple cores may be more appropriate than a single core.
>> > > > Typically
>> > > > > > I would expect that using a single thread to do serialization is
>> > > > likely to
>> > > > > > be the most situation, I was just responding to an earlier point
>> > that
>> > > > was
>> > > > > > made in regards to using ThreadPools for serialization (note
>> that
>> > you
>> > > > can
>> > > > > > also just use a ThreadPool that is pinned to a single thread)
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > >
>> > > > > > > > 4. Regarding providing the ability for users to supply their
>> > own
>> > > > custom
>> > > > > > > > ThreadPool this is more of an ergonomics question for the
>> API.
>> > > > > > Especially
>> > > > > > > > when it gets to monitoring/tracing, giving the ability for
>> > users
>> > > to
>> > > > > > > provide
>> > > > > > > > their own custom IO/CPU ThreadPools is ideal however as
>> stated
>> > > > doing so
>> > > > > > > > means a lot of boilerplatery changes to the API. Typically
>> > > > speaking a
>> > > > > > lot
>> > > > > > > > of monitoring/tracing/diagnosing is done on
>> > > > > > ExecutionContext/ThreadPools
>> > > > > > > > (at least on a more rudimentary level) and hence allowing
>> users
>> > > to
>> > > > > > supply
>> > > > > > > a
>> > > > > > > > global singleton ThreadPool for IO tasks and another for CPU
>> > > tasks
>> > > > > > makes
>> > > > > > > > their lives a lot easier. However due to the large amount of
>> > > > changes to
>> > > > > > > the
>> > > > > > > > API, it may be more appropriate to just use internal thread
>> > pools
>> > > > (for
>> > > > > > > now)
>> > > > > > > > since at least it's not any worse than what exists currently
>> > and
>> > > > this
>> > > > > > can
>> > > > > > > > be an improvement that is done later?
>> > > > > > > Is there an existing threadpool that you suggest we reuse?  Or
>> > are
>> > > > you
>> > > > > > > imagining that we make our own internal threadpool, and then
>> > maybe
>> > > > expose
>> > > > > > > configuration flags to manipulate it?  For what it's worth, I
>> > like
>> > > > having
>> > > > > > > an internal threadpool (perhaps just FJP.commonpool) and then
>> > > > providing
>> > > > > > an
>> > > > > > > alternative to pass your own threadpool.  That way people who
>> > want
>> > > > finer
>> > > > > > > control can get it, and everyone else can do OK with the
>> default.
>> > > > > > >
>> > > > > > Indeed that is what I am saying. The most ideal situation is
>> that
>> > > > there is
>> > > > > > a default internal threadpool that Kafka uses, however users of
>> > Kafka
>> > > > can
>> > > > > > configure there own threadpool. Having a singleton ThreadPool
>> for
>> > > > blocking
>> > > > > > IO, non blocking IO and CPU bound tasks which can be plugged in
>> all
>> > > of
>> > > > your
>> > > > > > libraries (including Kafka) makes resource management much
>> easier
>> > to
>> > > > do and
>> > > > > > also gives control of users to override specific threadpools for
>> > > > > > exceptional cases (i.e. providing a Threadpool that is pinned
>> to a
>> > > > single
>> > > > > > core which tends to give the best latency results if this is
>> > > something
>> > > > that
>> > > > > > is critical for you).
>> > > > > >
>> > > > > > My suggestion was just do this in multiple steps/phases, firstly
>> > > let's
>> > > > fix
>> > > > > > the issue of send being misleadingly asynchronous (i.e.
>> internally
>> > > its
>> > > > > > blocking) and then later one we can make the various
>> > > > > > threadpools configurable with a sane default.
>> > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, May 19, 2021 at 6:01 AM Matthew de Detrich
>> > > > > > > <matthew.dedetr...@aiven.io.invalid> wrote:
>> > > > > > >
>> > > > > > > > Here are my two cents here (note that I am only seeing this
>> on
>> > a
>> > > > > > surface
>> > > > > > > > level)
>> > > > > > > >
>> > > > > > > > 1. If we are going this road it makes sense to do this
>> > "properly"
>> > > > (i.e.
>> > > > > > > > using queues  as Ryan suggested). The reason I am saying
>> this
>> > is
>> > > > that
>> > > > > > it
>> > > > > > > > seems that the original goal of the KIP is for it to be
>> used in
>> > > > other
>> > > > > > > > asynchronous systems and from my personal experience, you
>> > really
>> > > do
>> > > > > > need
>> > > > > > > to
>> > > > > > > > make the implementation properly asynchronous otherwise it's
>> > > > really not
>> > > > > > > > that useful.
>> > > > > > > > 2. Due to the previous point and what was said by others,
>> this
>> > is
>> > > > > > likely
>> > > > > > > > going to break some existing semantics (i.e. people are
>> > currently
>> > > > > > relying
>> > > > > > > > on blocking semantics) so adding another method's/interface
>> > plus
>> > > > > > > > deprecating the older one is more annoying but ideal.
>> > > > > > > > 3. Using multiple thread pools is definitely recommended for
>> > > > different
>> > > > > > > > types of tasks, for serialization which is CPU bound you
>> > > definitely
>> > > > > > would
>> > > > > > > > want to use a bounded thread pool that is fixed by the
>> number
>> > of
>> > > > CPU's
>> > > > > > > (or
>> > > > > > > > something along those lines).
>> > > > > > > >
>> > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c
>> > > > is
>> > > > > > a
>> > > > > > > > very good guide on this topic
>> > > > > > > > 4. Regarding providing the ability for users to supply their
>> > own
>> > > > custom
>> > > > > > > > ThreadPool this is more of an ergonomics question for the
>> API.
>> > > > > > Especially
>> > > > > > > > when it gets to monitoring/tracing, giving the ability for
>> > users
>> > > to
>> > > > > > > provide
>> > > > > > > > their own custom IO/CPU ThreadPools is ideal however as
>> stated
>> > > > doing so
>> > > > > > > > means a lot of boilerplatery changes to the API. Typically
>> > > > speaking a
>> > > > > > lot
>> > > > > > > > of monitoring/tracing/diagnosing is done on
>> > > > > > ExecutionContext/ThreadPools
>> > > > > > > > (at least on a more rudimentary level) and hence allowing
>> users
>> > > to
>> > > > > > > supply a
>> > > > > > > > global singleton ThreadPool for IO tasks and another for CPU
>> > > tasks
>> > > > > > makes
>> > > > > > > > their lives a lot easier. However due to the large amount of
>> > > > changes to
>> > > > > > > the
>> > > > > > > > API, it may be more appropriate to just use internal thread
>> > pools
>> > > > (for
>> > > > > > > now)
>> > > > > > > > since at least it's not any worse than what exists currently
>> > and
>> > > > this
>> > > > > > can
>> > > > > > > > be an improvement that is done later?
>> > > > > > > >
>> > > > > > > > On Wed, May 19, 2021 at 2:56 AM Ryanne Dolan <
>> > > > ryannedo...@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > I was thinking the sender would typically wrap send() in a
>> > > > > > > backoff/retry
>> > > > > > > > > loop, or else ignore any failures and drop sends on the
>> floor
>> > > > > > > > > (fire-and-forget), and in both cases I think failing
>> > > immediately
>> > > > is
>> > > > > > > > better
>> > > > > > > > > than blocking for a new spot in the queue or
>> asynchronously
>> > > > failing
>> > > > > > > > > somehow.
>> > > > > > > > >
>> > > > > > > > > I think a failed future is adequate for the "explicit
>> > > > backpressure
>> > > > > > > > signal"
>> > > > > > > > > while avoiding any blocking anywhere. I think if we try to
>> > > > > > > asynchronously
>> > > > > > > > > signal the caller of failure (either by asynchronously
>> > failing
>> > > > the
>> > > > > > > future
>> > > > > > > > > or invoking a callback off-thread or something) we'd force
>> > the
>> > > > caller
>> > > > > > > to
>> > > > > > > > > either block or poll waiting for that signal, which
>> somewhat
>> > > > defeats
>> > > > > > > the
>> > > > > > > > > purpose we're after. And of course blocking for a spot in
>> the
>> > > > queue
>> > > > > > > > > definitely defeats the purpose (tho perhaps ameliorates
>> the
>> > > > problem
>> > > > > > > > some).
>> > > > > > > > >
>> > > > > > > > > Throwing an exception to the caller directly (not via the
>> > > > future) is
>> > > > > > > > > another option with precedent in Kafka clients, tho it
>> > doesn't
>> > > > seem
>> > > > > > as
>> > > > > > > > > ergonomic to me.
>> > > > > > > > >
>> > > > > > > > > It would be interesting to analyze some existing usage and
>> > > > determine
>> > > > > > > how
>> > > > > > > > > difficult it would be to convert it to the various
>> proposed
>> > > APIs.
>> > > > > > > > >
>> > > > > > > > > Ryanne
>> > > > > > > > >
>> > > > > > > > > On Tue, May 18, 2021, 3:27 PM Nakamura <nny...@gmail.com>
>> > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Ryanne,
>> > > > > > > > > >
>> > > > > > > > > > Hmm, that's an interesting idea.  Basically it would
>> mean
>> > > that
>> > > > > > after
>> > > > > > > > > > calling send, you would also have to check whether the
>> > > returned
>> > > > > > > future
>> > > > > > > > > had
>> > > > > > > > > > failed with a specific exception.  I would be open to
>> it,
>> > > > although
>> > > > > > I
>> > > > > > > > > think
>> > > > > > > > > > it might be slightly more surprising, since right now
>> the
>> > > > paradigm
>> > > > > > is
>> > > > > > > > > > "enqueue synchronously, the future represents whether we
>> > > > succeeded
>> > > > > > in
>> > > > > > > > > > sending or not" and the new one would be "enqueue
>> > > > synchronously,
>> > > > > > the
>> > > > > > > > > future
>> > > > > > > > > > either represents whether we succeeded in enqueueing or
>> not
>> > > (in
>> > > > > > which
>> > > > > > > > > case
>> > > > > > > > > > it will be failed immediately if it failed to enqueue)
>> or
>> > > > whether
>> > > > > > we
>> > > > > > > > > > succeeded in sending or not".
>> > > > > > > > > >
>> > > > > > > > > > But you're right, it should be on the table, thank you
>> for
>> > > > > > suggesting
>> > > > > > > > it!
>> > > > > > > > > >
>> > > > > > > > > > Best,
>> > > > > > > > > > Moses
>> > > > > > > > > >
>> > > > > > > > > > On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan <
>> > > > > > ryannedo...@gmail.com
>> > > > > > > >
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Moses, in the case of a full queue, could we just
>> return
>> > a
>> > > > failed
>> > > > > > > > > future
>> > > > > > > > > > > immediately?
>> > > > > > > > > > >
>> > > > > > > > > > > Ryanne
>> > > > > > > > > > >
>> > > > > > > > > > > On Tue, May 18, 2021, 10:39 AM Nakamura <
>> > nny...@gmail.com>
>> > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi Alexandre,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks for bringing this up, I think I could use
>> some
>> > > > feedback
>> > > > > > in
>> > > > > > > > > this
>> > > > > > > > > > > > area.  There are two mechanisms here, one for
>> slowing
>> > > down
>> > > > when
>> > > > > > > we
>> > > > > > > > > > don't
>> > > > > > > > > > > > have the relevant metadata, and the other for
>> slowing
>> > > down
>> > > > > > when a
>> > > > > > > > > queue
>> > > > > > > > > > > has
>> > > > > > > > > > > > filled up.  Although the first one applies
>> backpressure
>> > > > > > somewhat
>> > > > > > > > > > > > inadvertently, we could still get in trouble if
>> we're
>> > not
>> > > > > > > providing
>> > > > > > > > > > > > information to the mechanism that monitors whether
>> > we're
>> > > > > > queueing
>> > > > > > > > too
>> > > > > > > > > > > > much.  As for the second one, that is a classic
>> > > > backpressure
>> > > > > > use
>> > > > > > > > > case,
>> > > > > > > > > > so
>> > > > > > > > > > > > it's definitely important that we don't drop that
>> > > ability.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Right now backpressure is applied by blocking, which
>> > is a
>> > > > > > natural
>> > > > > > > > way
>> > > > > > > > > > to
>> > > > > > > > > > > > apply backpressure in synchronous systems, but can
>> lead
>> > > to
>> > > > > > > > > unnecessary
>> > > > > > > > > > > > slowdowns in asynchronous systems.  In my opinion,
>> the
>> > > > safest
>> > > > > > way
>> > > > > > > > to
>> > > > > > > > > > > apply
>> > > > > > > > > > > > backpressure in an asynchronous model is to have an
>> > > > explicit
>> > > > > > > > > > backpressure
>> > > > > > > > > > > > signal.  A good example would be returning an
>> > exception,
>> > > > and
>> > > > > > > > > providing
>> > > > > > > > > > an
>> > > > > > > > > > > > optional hook to add a callback onto so that you
>> can be
>> > > > > > notified
>> > > > > > > > when
>> > > > > > > > > > > it's
>> > > > > > > > > > > > ready to accept more messages.
>> > > > > > > > > > > >
>> > > > > > > > > > > > However, this would be a really big change to how
>> users
>> > > use
>> > > > > > > > > > > > KafkaProducer#send, so I don't know how much
>> appetite
>> > we
>> > > > have
>> > > > > > for
>> > > > > > > > > > making
>> > > > > > > > > > > > that kind of change.  Maybe it would be simpler to
>> > remove
>> > > > the
>> > > > > > > > "don't
>> > > > > > > > > > > block
>> > > > > > > > > > > > when the per-topic queue is full" from the scope of
>> > this
>> > > > KIP,
>> > > > > > and
>> > > > > > > > > only
>> > > > > > > > > > > > focus on when metadata is available?  The downside
>> is
>> > > that
>> > > > we
>> > > > > > > will
>> > > > > > > > > > > probably
>> > > > > > > > > > > > want to change the API again later to fix this, so
>> it
>> > > > might be
>> > > > > > > > better
>> > > > > > > > > > to
>> > > > > > > > > > > > just rip the bandaid off now.
>> > > > > > > > > > > >
>> > > > > > > > > > > > One slightly nasty thing here is that because
>> queueing
>> > > > order is
>> > > > > > > > > > > important,
>> > > > > > > > > > > > if we want to use exceptions, we will want to be
>> able
>> > to
>> > > > signal
>> > > > > > > the
>> > > > > > > > > > > failure
>> > > > > > > > > > > > to enqueue to the caller in such a way that they can
>> > > still
>> > > > > > > enforce
>> > > > > > > > > > > message
>> > > > > > > > > > > > order if they want.  So we can't embed the failure
>> > > > directly in
>> > > > > > > the
>> > > > > > > > > > > returned
>> > > > > > > > > > > > future, we should either return two futures
>> (nested, or
>> > > as
>> > > > a
>> > > > > > > tuple)
>> > > > > > > > > or
>> > > > > > > > > > > else
>> > > > > > > > > > > > throw an exception to explain a backpressure.
>> > > > > > > > > > > >
>> > > > > > > > > > > > So there are a few things we should work out here:
>> > > > > > > > > > > >
>> > > > > > > > > > > > 1. Should we keep the "too many bytes enqueued"
>> part of
>> > > > this in
>> > > > > > > > > scope?
>> > > > > > > > > > > (I
>> > > > > > > > > > > > would say yes, so that we can minimize churn in this
>> > API)
>> > > > > > > > > > > > 2. How should we signal backpressure so that it's
>> > > > appropriate
>> > > > > > for
>> > > > > > > > > > > > asynchronous systems?  (I would say that we should
>> > throw
>> > > an
>> > > > > > > > > exception.
>> > > > > > > > > > > If
>> > > > > > > > > > > > we choose this and we want to pursue the queueing
>> path,
>> > > we
>> > > > > > would
>> > > > > > > > > *not*
>> > > > > > > > > > > want
>> > > > > > > > > > > > to enqueue messages that would push us over the
>> limit,
>> > > and
>> > > > > > would
>> > > > > > > > only
>> > > > > > > > > > > want
>> > > > > > > > > > > > to enqueue messages when we're waiting for metadata,
>> > and
>> > > we
>> > > > > > would
>> > > > > > > > > want
>> > > > > > > > > > to
>> > > > > > > > > > > > keep track of the total number of bytes for those
>> > > > messages).
>> > > > > > > > > > > >
>> > > > > > > > > > > > What do you think?
>> > > > > > > > > > > >
>> > > > > > > > > > > > Best,
>> > > > > > > > > > > > Moses
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez <
>> > > > > > > > > > > > alexandre.dupr...@gmail.com> wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hello Nakamura,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks for proposing this change. I can see how
>> the
>> > > > blocking
>> > > > > > > > > > behaviour
>> > > > > > > > > > > > > can be a problem when integrating with reactive
>> > > > frameworks
>> > > > > > such
>> > > > > > > > as
>> > > > > > > > > > > > > Akka. One of the questions I would have is how you
>> > > would
>> > > > > > handle
>> > > > > > > > > back
>> > > > > > > > > > > > > pressure and avoid memory exhaustion when the
>> > > producer's
>> > > > > > buffer
>> > > > > > > > is
>> > > > > > > > > > > > > full and tasks would start to accumulate in the
>> > > > out-of-band
>> > > > > > > queue
>> > > > > > > > > or
>> > > > > > > > > > > > > thread pool introduced with this KIP.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > Alexandre
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan <
>> > > > > > > ryannedo...@gmail.com
>> > > > > > > > >
>> > > > > > > > > a
>> > > > > > > > > > > > écrit
>> > > > > > > > > > > > > :
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Makes sense!
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Ryanne
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > On Fri, May 14, 2021, 9:39 AM Nakamura <
>> > > > nny...@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Hey Ryanne,
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > I see what you're saying about serde blocking,
>> > but
>> > > I
>> > > > > > think
>> > > > > > > we
>> > > > > > > > > > > should
>> > > > > > > > > > > > > > > consider it out of scope for this patch.
>> Right
>> > now
>> > > > we've
>> > > > > > > > > nailed
>> > > > > > > > > > > > down a
>> > > > > > > > > > > > > > > couple of use cases where we can unambiguously
>> > say,
>> > > > "I
>> > > > > > can
>> > > > > > > > make
>> > > > > > > > > > > > > progress
>> > > > > > > > > > > > > > > now" or "I cannot make progress now", which
>> makes
>> > > it
>> > > > > > > possible
>> > > > > > > > > to
>> > > > > > > > > > > > > offload to
>> > > > > > > > > > > > > > > a different thread only if we are unable to
>> make
>> > > > > > progress.
>> > > > > > > > > > > Extending
>> > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > to CPU work like serde would mean always
>> > > offloading,
>> > > > > > which
>> > > > > > > > > would
>> > > > > > > > > > > be a
>> > > > > > > > > > > > > > > really big performance change.  It might be
>> worth
>> > > > > > exploring
>> > > > > > > > > > anyway,
>> > > > > > > > > > > > > but I'd
>> > > > > > > > > > > > > > > rather keep this patch focused on improving
>> > > > ergonomics,
>> > > > > > > > rather
>> > > > > > > > > > than
>> > > > > > > > > > > > > > > muddying the waters with evaluating
>> performance
>> > > very
>> > > > > > > deeply.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > I think if we really do want to support serde
>> or
>> > > > > > > interceptors
>> > > > > > > > > > that
>> > > > > > > > > > > do
>> > > > > > > > > > > > > IO on
>> > > > > > > > > > > > > > > the send path (which seems like an
>> anti-pattern
>> > to
>> > > > me),
>> > > > > > we
>> > > > > > > > > should
>> > > > > > > > > > > > > consider
>> > > > > > > > > > > > > > > making that a separate SIP, and probably also
>> > > > consider
>> > > > > > > > changing
>> > > > > > > > > > the
>> > > > > > > > > > > > > API to
>> > > > > > > > > > > > > > > use Futures (or CompletionStages).  But I
>> would
>> > > > rather
>> > > > > > > avoid
>> > > > > > > > > > scope
>> > > > > > > > > > > > > creep,
>> > > > > > > > > > > > > > > so that we have a better chance of fixing this
>> > part
>> > > > of
>> > > > > > the
>> > > > > > > > > > problem.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Yes, I think some exceptions will move to
>> being
>> > > async
>> > > > > > > instead
>> > > > > > > > > of
>> > > > > > > > > > > > sync.
>> > > > > > > > > > > > > > > They'll still be surfaced in the Future, so
>> I'm
>> > not
>> > > > so
>> > > > > > > > > confident
>> > > > > > > > > > > that
>> > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > would be that big a shock to users though.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > Moses
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan <
>> > > > > > > > > > > ryannedo...@gmail.com>
>> > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > re serialization, my concern is that
>> > > serialization
>> > > > > > often
>> > > > > > > > > > accounts
>> > > > > > > > > > > > > for a
>> > > > > > > > > > > > > > > lot
>> > > > > > > > > > > > > > > > of the cycles spent before returning the
>> > future.
>> > > > It's
>> > > > > > not
>> > > > > > > > > > > blocking
>> > > > > > > > > > > > > per
>> > > > > > > > > > > > > > > se,
>> > > > > > > > > > > > > > > > but it's the same effect from the caller's
>> > > > perspective.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Moreover, serde impls often block
>> themselves,
>> > > e.g.
>> > > > when
>> > > > > > > > > > fetching
>> > > > > > > > > > > > > schemas
>> > > > > > > > > > > > > > > > from a registry. I suppose it's also
>> possible
>> > to
>> > > > block
>> > > > > > in
>> > > > > > > > > > > > > Interceptors
>> > > > > > > > > > > > > > > > (e.g. writing audit events or metrics),
>> which
>> > > > happens
>> > > > > > > > before
>> > > > > > > > > > > serdes
>> > > > > > > > > > > > > iiuc.
>> > > > > > > > > > > > > > > > So any blocking in either of those plugins
>> > would
>> > > > block
>> > > > > > > the
>> > > > > > > > > send
>> > > > > > > > > > > > > unless we
>> > > > > > > > > > > > > > > > queue first.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > So I think we want to queue first and do
>> > > everything
>> > > > > > > > > off-thread
>> > > > > > > > > > > when
>> > > > > > > > > > > > > using
>> > > > > > > > > > > > > > > > the new API, whatever that looks like. I
>> just
>> > > want
>> > > > to
>> > > > > > > make
>> > > > > > > > > sure
>> > > > > > > > > > > we
>> > > > > > > > > > > > > don't
>> > > > > > > > > > > > > > > do
>> > > > > > > > > > > > > > > > that for clients that wouldn't expect it.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Another consideration is exception
>> handling. If
>> > > we
>> > > > > > queue
>> > > > > > > > > right
>> > > > > > > > > > > > away,
>> > > > > > > > > > > > > > > we'll
>> > > > > > > > > > > > > > > > defer some exceptions that currently are
>> thrown
>> > > to
>> > > > the
>> > > > > > > > caller
>> > > > > > > > > > > > > (before the
>> > > > > > > > > > > > > > > > future is returned). In the new API, the
>> send()
>> > > > > > wouldn't
>> > > > > > > > > throw
>> > > > > > > > > > > any
>> > > > > > > > > > > > > > > > exceptions, and instead the future would
>> fail.
>> > I
>> > > > think
>> > > > > > > that
>> > > > > > > > > > might
>> > > > > > > > > > > > > mean
>> > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > a new method signature is required.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Ryanne
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > On Thu, May 13, 2021, 2:57 PM Nakamura <
>> > > > > > > > > > nakamura.mo...@gmail.com
>> > > > > > > > > > > >
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Hey Ryanne,
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > I agree we should add an additional
>> > constructor
>> > > > (or
>> > > > > > > else
>> > > > > > > > an
>> > > > > > > > > > > > > additional
>> > > > > > > > > > > > > > > > > overload in KafkaProducer#send, but the
>> new
>> > > > > > constructor
>> > > > > > > > > would
>> > > > > > > > > > > be
>> > > > > > > > > > > > > easier
>> > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > understand) if we're targeting the "user
>> > > > provides the
>> > > > > > > > > thread"
>> > > > > > > > > > > > > approach.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > From looking at the code, I think we can
>> keep
>> > > > record
>> > > > > > > > > > > > serialization
>> > > > > > > > > > > > > on
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > user thread, if we consider that an
>> important
>> > > > part of
>> > > > > > > the
>> > > > > > > > > > > > > semantics of
>> > > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > method.  It doesn't seem like
>> serialization
>> > > > depends
>> > > > > > on
>> > > > > > > > > > knowing
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > cluster,
>> > > > > > > > > > > > > > > > > I think it's incidental that it comes
>> after
>> > the
>> > > > first
>> > > > > > > > > > > "blocking"
>> > > > > > > > > > > > > > > segment
>> > > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > the method.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > Moses
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 2:38 PM Ryanne
>> Dolan
>> > <
>> > > > > > > > > > > > > ryannedo...@gmail.com>
>> > > > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Hey Moses, I like the direction here. My
>> > > > thinking
>> > > > > > is
>> > > > > > > > > that a
>> > > > > > > > > > > > > single
>> > > > > > > > > > > > > > > > > > additional work queue, s.t. send() can
>> > > enqueue
>> > > > and
>> > > > > > > > > return,
>> > > > > > > > > > > > seems
>> > > > > > > > > > > > > like
>> > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > lightest touch. However, I don't think
>> we
>> > can
>> > > > > > > trivially
>> > > > > > > > > > > process
>> > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > queue
>> > > > > > > > > > > > > > > > > > in an internal thread pool without
>> subtly
>> > > > changing
>> > > > > > > > > behavior
>> > > > > > > > > > > for
>> > > > > > > > > > > > > some
>> > > > > > > > > > > > > > > > > users.
>> > > > > > > > > > > > > > > > > > For example, users will often run
>> send() in
>> > > > > > multiple
>> > > > > > > > > > threads
>> > > > > > > > > > > in
>> > > > > > > > > > > > > order
>> > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > serialize faster, but that wouldn't work
>> > > quite
>> > > > the
>> > > > > > > same
>> > > > > > > > > if
>> > > > > > > > > > > > there
>> > > > > > > > > > > > > were
>> > > > > > > > > > > > > > > > an
>> > > > > > > > > > > > > > > > > > internal thread pool.
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > For this reason I'm thinking we need to
>> > make
>> > > > sure
>> > > > > > any
>> > > > > > > > > such
>> > > > > > > > > > > > > changes
>> > > > > > > > > > > > > > > are
>> > > > > > > > > > > > > > > > > > opt-in. Maybe a new constructor with an
>> > > > additional
>> > > > > > > > > > > > ThreadFactory
>> > > > > > > > > > > > > > > > > parameter.
>> > > > > > > > > > > > > > > > > > That would at least clearly indicate
>> that
>> > > work
>> > > > will
>> > > > > > > > > happen
>> > > > > > > > > > > > > > > off-thread,
>> > > > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > would require opt-in for the new
>> behavior.
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Under the hood, this ThreadFactory
>> could be
>> > > > used to
>> > > > > > > > > create
>> > > > > > > > > > > the
>> > > > > > > > > > > > > worker
>> > > > > > > > > > > > > > > > > > thread that process queued sends, which
>> > could
>> > > > > > fan-out
>> > > > > > > > to
>> > > > > > > > > > > > > > > per-partition
>> > > > > > > > > > > > > > > > > > threads from there.
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > So then you'd have two ways to send: the
>> > > > existing
>> > > > > > > way,
>> > > > > > > > > > where
>> > > > > > > > > > > > > serde
>> > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > interceptors and whatnot are executed on
>> > the
>> > > > > > calling
>> > > > > > > > > > thread,
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > new
>> > > > > > > > > > > > > > > > > > way, which returns right away and uses
>> an
>> > > > internal
>> > > > > > > > > > Executor.
>> > > > > > > > > > > As
>> > > > > > > > > > > > > you
>> > > > > > > > > > > > > > > > point
>> > > > > > > > > > > > > > > > > > out, the semantics would be identical in
>> > > either
>> > > > > > case,
>> > > > > > > > and
>> > > > > > > > > > it
>> > > > > > > > > > > > > would be
>> > > > > > > > > > > > > > > > > very
>> > > > > > > > > > > > > > > > > > easy for clients to switch.
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Ryanne
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 9:00 AM Nakamura <
>> > > > > > > > nny...@gmail.com
>> > > > > > > > > >
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > Hey Folks,
>> > > > > > > > > > > > > > > > > > > I just posted a new proposal
>> > > > > > > > > > > > > > > > > > > <
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > in the wiki.  I think we have an
>> > > opportunity
>> > > > to
>> > > > > > > > improve
>> > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > KafkaProducer#send user experience.
>> It
>> > > would
>> > > > > > > > certainly
>> > > > > > > > > > > make
>> > > > > > > > > > > > > our
>> > > > > > > > > > > > > > > > lives
>> > > > > > > > > > > > > > > > > > > easier.  Please take a look!  There
>> are
>> > two
>> > > > > > > > subproblems
>> > > > > > > > > > > that
>> > > > > > > > > > > > I
>> > > > > > > > > > > > > > > could
>> > > > > > > > > > > > > > > > > use
>> > > > > > > > > > > > > > > > > > > guidance on, so I would appreciate
>> > feedback
>> > > > on
>> > > > > > both
>> > > > > > > > of
>> > > > > > > > > > > them.
>> > > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > Moses
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > --
>> > > > > > > >
>> > > > > > > > Matthew de Detrich
>> > > > > > > >
>> > > > > > > > *Aiven Deutschland GmbH*
>> > > > > > > >
>> > > > > > > > Immanuelkirchstraße 26, 10405 Berlin
>> > > > > > > >
>> > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B
>> > > > > > > >
>> > > > > > > > *m:* +491603708037
>> > > > > > > >
>> > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > >
>> > > > > > Matthew de Detrich
>> > > > > >
>> > > > > > *Aiven Deutschland GmbH*
>> > > > > >
>> > > > > > Immanuelkirchstraße 26, 10405 Berlin
>> > > > > >
>> > > > > > Amtsgericht Charlottenburg, HRB 209739 B
>> > > > > >
>> > > > > > *m:* +491603708037
>> > > > > >
>> > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io
>> > > > > >
>> > > > >
>> > > >
>> > >
>> > >
>> > > --
>> > >
>> > > Matthew de Detrich
>> > >
>> > > *Aiven Deutschland GmbH*
>> > >
>> > > Immanuelkirchstraße 26, 10405 Berlin
>> > >
>> > > Amtsgericht Charlottenburg, HRB 209739 B
>> > >
>> > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>> > >
>> > > *m:* +491603708037
>> > >
>> > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io
>> > >
>> >
>>
>>
>> --
>>
>> Matthew de Detrich
>>
>> *Aiven Deutschland GmbH*
>>
>> Immanuelkirchstraße 26, 10405 Berlin
>>
>> Amtsgericht Charlottenburg, HRB 209739 B
>>
>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>>
>> *m:* +491603708037
>>
>> *w:* aiven.io *e:* matthew.dedetr...@aiven.io
>>
>
>
> --
>
> Josep Prat
>
> *Aiven Deutschland GmbH*
>
> Immanuelkirchstraße 26, 10405 Berlin
>
> Amtsgericht Charlottenburg, HRB 209739 B
>
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>
> *m:* +491715557497
>
> *w:* aiven.io
>
> *e:* josep.p...@aiven.io
>

Reply via email to