Sorry, I meant `CompletionStage` (instead of CompletableFuture) as this is the interface.
Best, ——— Josep Prat Aiven Deutschland GmbH Immanuelkirchstraße 26, 10405 Berlin Amtsgericht Charlottenburg, HRB 209739 B Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen m: +491715557497 w: aiven.io e: josep.p...@aiven.io On Wed, May 26, 2021, 15:36 Josep Prat <josep.p...@aiven.io> wrote: > Hi, > If I may, I would like to suggest that instead of using Java's `Future` > class on the API's, it would be better to use `CompletableFuture`. This > would offer the possibility of applying callbacks on its completion for > example. > > Best, > > On Wed, May 26, 2021 at 3:28 PM Matthew de Detrich > <matthew.dedetr...@aiven.io.invalid> wrote: > >> Maybe there was a miscommunication but I agree with everything you said, I >> was just clarifying what my definition of blocking is (because I think >> there was a misunderstanding). >> >> And yes you are right, there is a limited amount of threads which is why >> blocking is a bad thing because having threads sitting around waiting/not >> doing anything is a waste of resources but ultimately this is also a >> performance problem because if you don't block you can simply process more >> IO tasks on a given machine/instance (hence greater performance). >> >> In any case, as is clarified the current behavior of send() needs to be >> fixed. It's returning a Future but since it's internally blocking and >> using >> the caller's thread from an API perspective it gives the incorrect >> impression that it's asynchronous (when it's not). >> >> On Wed, May 26, 2021 at 3:15 PM Ryanne Dolan <ryannedo...@gmail.com> >> wrote: >> >> > Matthew, it's more than performance tho. In many frameworks the number >> of >> > request threads is purposefully constrained, and blocking one means you >> > have one less to handle requests with. When you're handling a large >> amount >> > of requests with a small number of threads, any blocking can lead to >> thread >> > exhaustion. >> > >> > For this reason, you'll often see send() wrapped in a future or thread >> > pool. But it's surprising that this would be required, since send() >> already >> > returns a future. >> > >> > Additionally, even when send() does not actually block, it does a lot of >> > work on the caller's thread, which is likewise surprising given a >> future is >> > returned. The effect is the same: less threads are available to handle >> > requests, and you risk thread exhaustion. >> > >> > I think we may incidentally improve performance if we introduce an >> internal >> > thread pool, but the primary motivation here, at least for me, is to fix >> > the lie the API is telling, not to improve performance. >> > >> > Ryanne >> > >> > >> > >> > On Wed, May 26, 2021, 6:51 AM Matthew de Detrich >> > <matthew.dedetr...@aiven.io.invalid> wrote: >> > >> > > I think we may need to clarify terminology here, at least to me >> blocking >> > > means suspending a current thread to wait for some operation (which is >> > > wasteful if we are dealing with IO bound tasks). In other words, the >> > > "blocking" is an implementation detail on how to wait rather than >> whether >> > > we need to wait or not, so to me this is more of a performance >> question. >> > > >> > > In the scenario you describe of kafka clients producing too many >> > messages, >> > > as you said buffering is what should be done but I wouldn't classify >> this >> > > as blocking. >> > > >> > > On Mon, May 24, 2021 at 7:54 PM Colin McCabe <cmcc...@apache.org> >> wrote: >> > > >> > > > Hi all, >> > > > >> > > > I agree that we should give users the option of having a fully async >> > API, >> > > > but I don't think external thread pools or queues are the right >> > direction >> > > > to go here. They add performance overheads and don't address the >> root >> > > > causes of the problem. >> > > > >> > > > There are basically two scenarios where we block, currently. One is >> > when >> > > > we are doing a metadata fetch. I think this is clearly a bug, or at >> > least >> > > > an implementation limitation. From the user's point of view, the >> fact >> > > that >> > > > we are doing a metadata fetch is an implementation detail that >> really >> > > > shouldn't be exposed like this. We have talked about fixing this in >> the >> > > > past. I think we just should spend the time to do it. >> > > > >> > > > The second scenario is where the client has produced too much data >> in >> > too >> > > > little time. This could happen if there is a network glitch, or the >> > > server >> > > > is slower than expected. In this case, the behavior is intentional >> and >> > > not >> > > > a bug. To understand this, think about what would happen if we >> didn't >> > > > block. We would start buffering more and more data in memory, until >> > > finally >> > > > the application died with an out of memory error. That would be >> > > frustrating >> > > > for users and wouldn't add to the usability of Kafka. >> > > > >> > > > We could potentially have an option to handle the out-of-memory >> > scenario >> > > > differently by returning an error code immediately rather than >> > blocking. >> > > > Applications would have to be rewritten to handle this properly, >> but it >> > > is >> > > > a possibility. I suspect that most of them wouldn't use this, but we >> > > could >> > > > offer it as a possibility for async purists (which might include >> > certain >> > > > frameworks). The big problem the users would have to solve is what >> to >> > do >> > > > with the record that they were unable to produce due to the buffer >> full >> > > > issue. >> > > > >> > > > best, >> > > > Colin >> > > > >> > > > >> > > > On Thu, May 20, 2021, at 10:35, Nakamura wrote: >> > > > > > >> > > > > > My suggestion was just do this in multiple steps/phases, firstly >> > > let's >> > > > fix >> > > > > > the issue of send being misleadingly asynchronous (i.e. >> internally >> > > its >> > > > > > blocking) and then later one we can make the various >> > > > > > threadpools configurable with a sane default. >> > > > > >> > > > > I like that approach. I updated the "Which thread should be >> > responsible >> > > > for >> > > > > waiting" part of KIP-739 to add your suggestion as my recommended >> > > > approach, >> > > > > thank you! If no one else has major concerns about that approach, >> > I'll >> > > > > move the alternatives to "rejected alternatives". >> > > > > >> > > > > On Thu, May 20, 2021 at 7:26 AM Matthew de Detrich >> > > > > <matthew.dedetr...@aiven.io.invalid> wrote: >> > > > > >> > > > > > @ >> > > > > > >> > > > > > Nakamura >> > > > > > On Wed, May 19, 2021 at 7:35 PM Nakamura <nny...@gmail.com> >> wrote: >> > > > > > >> > > > > > > @Ryanne: >> > > > > > > In my mind's eye I slightly prefer the throwing the "cannot >> > > enqueue" >> > > > > > > exception to satisfying the future immediately with the >> "cannot >> > > > enqueue" >> > > > > > > exception? But I agree, it would be worth doing more >> research. >> > > > > > > >> > > > > > > @Matthew: >> > > > > > > >> > > > > > > > 3. Using multiple thread pools is definitely recommended for >> > > > different >> > > > > > > > types of tasks, for serialization which is CPU bound you >> > > definitely >> > > > > > would >> > > > > > > > want to use a bounded thread pool that is fixed by the >> number >> > of >> > > > CPU's >> > > > > > > (or >> > > > > > > > something along those lines). >> > > > > > > > >> > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c >> > > > is >> > > > > > a >> > > > > > > > very good guide on this topic >> > > > > > > I think this guide is good in general, but I would be >> hesitant to >> > > > follow >> > > > > > > its guidance re: offloading serialization without benchmarking >> > it. >> > > > My >> > > > > > > understanding is that context-switches have gotten much >> cheaper, >> > > and >> > > > that >> > > > > > > gains from cache locality are small, but they're not nothing. >> > > > Especially >> > > > > > > if the workload has a very small serialization cost, I >> wouldn't >> > be >> > > > > > shocked >> > > > > > > if it made it slower. I feel pretty strongly that we should >> do >> > > more >> > > > > > > research here before unconditionally encouraging serialization >> > in a >> > > > > > > threadpool. If people think it's important to do it here (eg >> if >> > we >> > > > think >> > > > > > > it would mean another big API change) then we should start >> > thinking >> > > > about >> > > > > > > what benchmarking we can do to gain higher confidence in this >> > kind >> > > of >> > > > > > > change. However, I don't think it would change semantics as >> > > > > > substantially >> > > > > > > as we're proposing here, so I would vote for pushing this to a >> > > > subsequent >> > > > > > > KIP. >> > > > > > > >> > > > > > Of course, its all down to benchmarking, benchmarking and >> > > benchmarking. >> > > > > > Ideally speaking you want to use all of the resources available >> to >> > > > you, so >> > > > > > if you have a bottleneck in serialization and you have many >> cores >> > > free >> > > > then >> > > > > > using multiple cores may be more appropriate than a single core. >> > > > Typically >> > > > > > I would expect that using a single thread to do serialization is >> > > > likely to >> > > > > > be the most situation, I was just responding to an earlier point >> > that >> > > > was >> > > > > > made in regards to using ThreadPools for serialization (note >> that >> > you >> > > > can >> > > > > > also just use a ThreadPool that is pinned to a single thread) >> > > > > > >> > > > > > >> > > > > > >> > > > > > > >> > > > > > > > 4. Regarding providing the ability for users to supply their >> > own >> > > > custom >> > > > > > > > ThreadPool this is more of an ergonomics question for the >> API. >> > > > > > Especially >> > > > > > > > when it gets to monitoring/tracing, giving the ability for >> > users >> > > to >> > > > > > > provide >> > > > > > > > their own custom IO/CPU ThreadPools is ideal however as >> stated >> > > > doing so >> > > > > > > > means a lot of boilerplatery changes to the API. Typically >> > > > speaking a >> > > > > > lot >> > > > > > > > of monitoring/tracing/diagnosing is done on >> > > > > > ExecutionContext/ThreadPools >> > > > > > > > (at least on a more rudimentary level) and hence allowing >> users >> > > to >> > > > > > supply >> > > > > > > a >> > > > > > > > global singleton ThreadPool for IO tasks and another for CPU >> > > tasks >> > > > > > makes >> > > > > > > > their lives a lot easier. However due to the large amount of >> > > > changes to >> > > > > > > the >> > > > > > > > API, it may be more appropriate to just use internal thread >> > pools >> > > > (for >> > > > > > > now) >> > > > > > > > since at least it's not any worse than what exists currently >> > and >> > > > this >> > > > > > can >> > > > > > > > be an improvement that is done later? >> > > > > > > Is there an existing threadpool that you suggest we reuse? Or >> > are >> > > > you >> > > > > > > imagining that we make our own internal threadpool, and then >> > maybe >> > > > expose >> > > > > > > configuration flags to manipulate it? For what it's worth, I >> > like >> > > > having >> > > > > > > an internal threadpool (perhaps just FJP.commonpool) and then >> > > > providing >> > > > > > an >> > > > > > > alternative to pass your own threadpool. That way people who >> > want >> > > > finer >> > > > > > > control can get it, and everyone else can do OK with the >> default. >> > > > > > > >> > > > > > Indeed that is what I am saying. The most ideal situation is >> that >> > > > there is >> > > > > > a default internal threadpool that Kafka uses, however users of >> > Kafka >> > > > can >> > > > > > configure there own threadpool. Having a singleton ThreadPool >> for >> > > > blocking >> > > > > > IO, non blocking IO and CPU bound tasks which can be plugged in >> all >> > > of >> > > > your >> > > > > > libraries (including Kafka) makes resource management much >> easier >> > to >> > > > do and >> > > > > > also gives control of users to override specific threadpools for >> > > > > > exceptional cases (i.e. providing a Threadpool that is pinned >> to a >> > > > single >> > > > > > core which tends to give the best latency results if this is >> > > something >> > > > that >> > > > > > is critical for you). >> > > > > > >> > > > > > My suggestion was just do this in multiple steps/phases, firstly >> > > let's >> > > > fix >> > > > > > the issue of send being misleadingly asynchronous (i.e. >> internally >> > > its >> > > > > > blocking) and then later one we can make the various >> > > > > > threadpools configurable with a sane default. >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Wed, May 19, 2021 at 6:01 AM Matthew de Detrich >> > > > > > > <matthew.dedetr...@aiven.io.invalid> wrote: >> > > > > > > >> > > > > > > > Here are my two cents here (note that I am only seeing this >> on >> > a >> > > > > > surface >> > > > > > > > level) >> > > > > > > > >> > > > > > > > 1. If we are going this road it makes sense to do this >> > "properly" >> > > > (i.e. >> > > > > > > > using queues as Ryan suggested). The reason I am saying >> this >> > is >> > > > that >> > > > > > it >> > > > > > > > seems that the original goal of the KIP is for it to be >> used in >> > > > other >> > > > > > > > asynchronous systems and from my personal experience, you >> > really >> > > do >> > > > > > need >> > > > > > > to >> > > > > > > > make the implementation properly asynchronous otherwise it's >> > > > really not >> > > > > > > > that useful. >> > > > > > > > 2. Due to the previous point and what was said by others, >> this >> > is >> > > > > > likely >> > > > > > > > going to break some existing semantics (i.e. people are >> > currently >> > > > > > relying >> > > > > > > > on blocking semantics) so adding another method's/interface >> > plus >> > > > > > > > deprecating the older one is more annoying but ideal. >> > > > > > > > 3. Using multiple thread pools is definitely recommended for >> > > > different >> > > > > > > > types of tasks, for serialization which is CPU bound you >> > > definitely >> > > > > > would >> > > > > > > > want to use a bounded thread pool that is fixed by the >> number >> > of >> > > > CPU's >> > > > > > > (or >> > > > > > > > something along those lines). >> > > > > > > > >> > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c >> > > > is >> > > > > > a >> > > > > > > > very good guide on this topic >> > > > > > > > 4. Regarding providing the ability for users to supply their >> > own >> > > > custom >> > > > > > > > ThreadPool this is more of an ergonomics question for the >> API. >> > > > > > Especially >> > > > > > > > when it gets to monitoring/tracing, giving the ability for >> > users >> > > to >> > > > > > > provide >> > > > > > > > their own custom IO/CPU ThreadPools is ideal however as >> stated >> > > > doing so >> > > > > > > > means a lot of boilerplatery changes to the API. Typically >> > > > speaking a >> > > > > > lot >> > > > > > > > of monitoring/tracing/diagnosing is done on >> > > > > > ExecutionContext/ThreadPools >> > > > > > > > (at least on a more rudimentary level) and hence allowing >> users >> > > to >> > > > > > > supply a >> > > > > > > > global singleton ThreadPool for IO tasks and another for CPU >> > > tasks >> > > > > > makes >> > > > > > > > their lives a lot easier. However due to the large amount of >> > > > changes to >> > > > > > > the >> > > > > > > > API, it may be more appropriate to just use internal thread >> > pools >> > > > (for >> > > > > > > now) >> > > > > > > > since at least it's not any worse than what exists currently >> > and >> > > > this >> > > > > > can >> > > > > > > > be an improvement that is done later? >> > > > > > > > >> > > > > > > > On Wed, May 19, 2021 at 2:56 AM Ryanne Dolan < >> > > > ryannedo...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > > I was thinking the sender would typically wrap send() in a >> > > > > > > backoff/retry >> > > > > > > > > loop, or else ignore any failures and drop sends on the >> floor >> > > > > > > > > (fire-and-forget), and in both cases I think failing >> > > immediately >> > > > is >> > > > > > > > better >> > > > > > > > > than blocking for a new spot in the queue or >> asynchronously >> > > > failing >> > > > > > > > > somehow. >> > > > > > > > > >> > > > > > > > > I think a failed future is adequate for the "explicit >> > > > backpressure >> > > > > > > > signal" >> > > > > > > > > while avoiding any blocking anywhere. I think if we try to >> > > > > > > asynchronously >> > > > > > > > > signal the caller of failure (either by asynchronously >> > failing >> > > > the >> > > > > > > future >> > > > > > > > > or invoking a callback off-thread or something) we'd force >> > the >> > > > caller >> > > > > > > to >> > > > > > > > > either block or poll waiting for that signal, which >> somewhat >> > > > defeats >> > > > > > > the >> > > > > > > > > purpose we're after. And of course blocking for a spot in >> the >> > > > queue >> > > > > > > > > definitely defeats the purpose (tho perhaps ameliorates >> the >> > > > problem >> > > > > > > > some). >> > > > > > > > > >> > > > > > > > > Throwing an exception to the caller directly (not via the >> > > > future) is >> > > > > > > > > another option with precedent in Kafka clients, tho it >> > doesn't >> > > > seem >> > > > > > as >> > > > > > > > > ergonomic to me. >> > > > > > > > > >> > > > > > > > > It would be interesting to analyze some existing usage and >> > > > determine >> > > > > > > how >> > > > > > > > > difficult it would be to convert it to the various >> proposed >> > > APIs. >> > > > > > > > > >> > > > > > > > > Ryanne >> > > > > > > > > >> > > > > > > > > On Tue, May 18, 2021, 3:27 PM Nakamura <nny...@gmail.com> >> > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi Ryanne, >> > > > > > > > > > >> > > > > > > > > > Hmm, that's an interesting idea. Basically it would >> mean >> > > that >> > > > > > after >> > > > > > > > > > calling send, you would also have to check whether the >> > > returned >> > > > > > > future >> > > > > > > > > had >> > > > > > > > > > failed with a specific exception. I would be open to >> it, >> > > > although >> > > > > > I >> > > > > > > > > think >> > > > > > > > > > it might be slightly more surprising, since right now >> the >> > > > paradigm >> > > > > > is >> > > > > > > > > > "enqueue synchronously, the future represents whether we >> > > > succeeded >> > > > > > in >> > > > > > > > > > sending or not" and the new one would be "enqueue >> > > > synchronously, >> > > > > > the >> > > > > > > > > future >> > > > > > > > > > either represents whether we succeeded in enqueueing or >> not >> > > (in >> > > > > > which >> > > > > > > > > case >> > > > > > > > > > it will be failed immediately if it failed to enqueue) >> or >> > > > whether >> > > > > > we >> > > > > > > > > > succeeded in sending or not". >> > > > > > > > > > >> > > > > > > > > > But you're right, it should be on the table, thank you >> for >> > > > > > suggesting >> > > > > > > > it! >> > > > > > > > > > >> > > > > > > > > > Best, >> > > > > > > > > > Moses >> > > > > > > > > > >> > > > > > > > > > On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan < >> > > > > > ryannedo...@gmail.com >> > > > > > > > >> > > > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Moses, in the case of a full queue, could we just >> return >> > a >> > > > failed >> > > > > > > > > future >> > > > > > > > > > > immediately? >> > > > > > > > > > > >> > > > > > > > > > > Ryanne >> > > > > > > > > > > >> > > > > > > > > > > On Tue, May 18, 2021, 10:39 AM Nakamura < >> > nny...@gmail.com> >> > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hi Alexandre, >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks for bringing this up, I think I could use >> some >> > > > feedback >> > > > > > in >> > > > > > > > > this >> > > > > > > > > > > > area. There are two mechanisms here, one for >> slowing >> > > down >> > > > when >> > > > > > > we >> > > > > > > > > > don't >> > > > > > > > > > > > have the relevant metadata, and the other for >> slowing >> > > down >> > > > > > when a >> > > > > > > > > queue >> > > > > > > > > > > has >> > > > > > > > > > > > filled up. Although the first one applies >> backpressure >> > > > > > somewhat >> > > > > > > > > > > > inadvertently, we could still get in trouble if >> we're >> > not >> > > > > > > providing >> > > > > > > > > > > > information to the mechanism that monitors whether >> > we're >> > > > > > queueing >> > > > > > > > too >> > > > > > > > > > > > much. As for the second one, that is a classic >> > > > backpressure >> > > > > > use >> > > > > > > > > case, >> > > > > > > > > > so >> > > > > > > > > > > > it's definitely important that we don't drop that >> > > ability. >> > > > > > > > > > > > >> > > > > > > > > > > > Right now backpressure is applied by blocking, which >> > is a >> > > > > > natural >> > > > > > > > way >> > > > > > > > > > to >> > > > > > > > > > > > apply backpressure in synchronous systems, but can >> lead >> > > to >> > > > > > > > > unnecessary >> > > > > > > > > > > > slowdowns in asynchronous systems. In my opinion, >> the >> > > > safest >> > > > > > way >> > > > > > > > to >> > > > > > > > > > > apply >> > > > > > > > > > > > backpressure in an asynchronous model is to have an >> > > > explicit >> > > > > > > > > > backpressure >> > > > > > > > > > > > signal. A good example would be returning an >> > exception, >> > > > and >> > > > > > > > > providing >> > > > > > > > > > an >> > > > > > > > > > > > optional hook to add a callback onto so that you >> can be >> > > > > > notified >> > > > > > > > when >> > > > > > > > > > > it's >> > > > > > > > > > > > ready to accept more messages. >> > > > > > > > > > > > >> > > > > > > > > > > > However, this would be a really big change to how >> users >> > > use >> > > > > > > > > > > > KafkaProducer#send, so I don't know how much >> appetite >> > we >> > > > have >> > > > > > for >> > > > > > > > > > making >> > > > > > > > > > > > that kind of change. Maybe it would be simpler to >> > remove >> > > > the >> > > > > > > > "don't >> > > > > > > > > > > block >> > > > > > > > > > > > when the per-topic queue is full" from the scope of >> > this >> > > > KIP, >> > > > > > and >> > > > > > > > > only >> > > > > > > > > > > > focus on when metadata is available? The downside >> is >> > > that >> > > > we >> > > > > > > will >> > > > > > > > > > > probably >> > > > > > > > > > > > want to change the API again later to fix this, so >> it >> > > > might be >> > > > > > > > better >> > > > > > > > > > to >> > > > > > > > > > > > just rip the bandaid off now. >> > > > > > > > > > > > >> > > > > > > > > > > > One slightly nasty thing here is that because >> queueing >> > > > order is >> > > > > > > > > > > important, >> > > > > > > > > > > > if we want to use exceptions, we will want to be >> able >> > to >> > > > signal >> > > > > > > the >> > > > > > > > > > > failure >> > > > > > > > > > > > to enqueue to the caller in such a way that they can >> > > still >> > > > > > > enforce >> > > > > > > > > > > message >> > > > > > > > > > > > order if they want. So we can't embed the failure >> > > > directly in >> > > > > > > the >> > > > > > > > > > > returned >> > > > > > > > > > > > future, we should either return two futures >> (nested, or >> > > as >> > > > a >> > > > > > > tuple) >> > > > > > > > > or >> > > > > > > > > > > else >> > > > > > > > > > > > throw an exception to explain a backpressure. >> > > > > > > > > > > > >> > > > > > > > > > > > So there are a few things we should work out here: >> > > > > > > > > > > > >> > > > > > > > > > > > 1. Should we keep the "too many bytes enqueued" >> part of >> > > > this in >> > > > > > > > > scope? >> > > > > > > > > > > (I >> > > > > > > > > > > > would say yes, so that we can minimize churn in this >> > API) >> > > > > > > > > > > > 2. How should we signal backpressure so that it's >> > > > appropriate >> > > > > > for >> > > > > > > > > > > > asynchronous systems? (I would say that we should >> > throw >> > > an >> > > > > > > > > exception. >> > > > > > > > > > > If >> > > > > > > > > > > > we choose this and we want to pursue the queueing >> path, >> > > we >> > > > > > would >> > > > > > > > > *not* >> > > > > > > > > > > want >> > > > > > > > > > > > to enqueue messages that would push us over the >> limit, >> > > and >> > > > > > would >> > > > > > > > only >> > > > > > > > > > > want >> > > > > > > > > > > > to enqueue messages when we're waiting for metadata, >> > and >> > > we >> > > > > > would >> > > > > > > > > want >> > > > > > > > > > to >> > > > > > > > > > > > keep track of the total number of bytes for those >> > > > messages). >> > > > > > > > > > > > >> > > > > > > > > > > > What do you think? >> > > > > > > > > > > > >> > > > > > > > > > > > Best, >> > > > > > > > > > > > Moses >> > > > > > > > > > > > >> > > > > > > > > > > > On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez < >> > > > > > > > > > > > alexandre.dupr...@gmail.com> wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hello Nakamura, >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks for proposing this change. I can see how >> the >> > > > blocking >> > > > > > > > > > behaviour >> > > > > > > > > > > > > can be a problem when integrating with reactive >> > > > frameworks >> > > > > > such >> > > > > > > > as >> > > > > > > > > > > > > Akka. One of the questions I would have is how you >> > > would >> > > > > > handle >> > > > > > > > > back >> > > > > > > > > > > > > pressure and avoid memory exhaustion when the >> > > producer's >> > > > > > buffer >> > > > > > > > is >> > > > > > > > > > > > > full and tasks would start to accumulate in the >> > > > out-of-band >> > > > > > > queue >> > > > > > > > > or >> > > > > > > > > > > > > thread pool introduced with this KIP. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > Alexandre >> > > > > > > > > > > > > >> > > > > > > > > > > > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan < >> > > > > > > ryannedo...@gmail.com >> > > > > > > > > >> > > > > > > > > a >> > > > > > > > > > > > écrit >> > > > > > > > > > > > > : >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Makes sense! >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Ryanne >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Fri, May 14, 2021, 9:39 AM Nakamura < >> > > > nny...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Hey Ryanne, >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > I see what you're saying about serde blocking, >> > but >> > > I >> > > > > > think >> > > > > > > we >> > > > > > > > > > > should >> > > > > > > > > > > > > > > consider it out of scope for this patch. >> Right >> > now >> > > > we've >> > > > > > > > > nailed >> > > > > > > > > > > > down a >> > > > > > > > > > > > > > > couple of use cases where we can unambiguously >> > say, >> > > > "I >> > > > > > can >> > > > > > > > make >> > > > > > > > > > > > > progress >> > > > > > > > > > > > > > > now" or "I cannot make progress now", which >> makes >> > > it >> > > > > > > possible >> > > > > > > > > to >> > > > > > > > > > > > > offload to >> > > > > > > > > > > > > > > a different thread only if we are unable to >> make >> > > > > > progress. >> > > > > > > > > > > Extending >> > > > > > > > > > > > > this >> > > > > > > > > > > > > > > to CPU work like serde would mean always >> > > offloading, >> > > > > > which >> > > > > > > > > would >> > > > > > > > > > > be a >> > > > > > > > > > > > > > > really big performance change. It might be >> worth >> > > > > > exploring >> > > > > > > > > > anyway, >> > > > > > > > > > > > > but I'd >> > > > > > > > > > > > > > > rather keep this patch focused on improving >> > > > ergonomics, >> > > > > > > > rather >> > > > > > > > > > than >> > > > > > > > > > > > > > > muddying the waters with evaluating >> performance >> > > very >> > > > > > > deeply. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > I think if we really do want to support serde >> or >> > > > > > > interceptors >> > > > > > > > > > that >> > > > > > > > > > > do >> > > > > > > > > > > > > IO on >> > > > > > > > > > > > > > > the send path (which seems like an >> anti-pattern >> > to >> > > > me), >> > > > > > we >> > > > > > > > > should >> > > > > > > > > > > > > consider >> > > > > > > > > > > > > > > making that a separate SIP, and probably also >> > > > consider >> > > > > > > > changing >> > > > > > > > > > the >> > > > > > > > > > > > > API to >> > > > > > > > > > > > > > > use Futures (or CompletionStages). But I >> would >> > > > rather >> > > > > > > avoid >> > > > > > > > > > scope >> > > > > > > > > > > > > creep, >> > > > > > > > > > > > > > > so that we have a better chance of fixing this >> > part >> > > > of >> > > > > > the >> > > > > > > > > > problem. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Yes, I think some exceptions will move to >> being >> > > async >> > > > > > > instead >> > > > > > > > > of >> > > > > > > > > > > > sync. >> > > > > > > > > > > > > > > They'll still be surfaced in the Future, so >> I'm >> > not >> > > > so >> > > > > > > > > confident >> > > > > > > > > > > that >> > > > > > > > > > > > > it >> > > > > > > > > > > > > > > would be that big a shock to users though. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > Moses >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan < >> > > > > > > > > > > ryannedo...@gmail.com> >> > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > re serialization, my concern is that >> > > serialization >> > > > > > often >> > > > > > > > > > accounts >> > > > > > > > > > > > > for a >> > > > > > > > > > > > > > > lot >> > > > > > > > > > > > > > > > of the cycles spent before returning the >> > future. >> > > > It's >> > > > > > not >> > > > > > > > > > > blocking >> > > > > > > > > > > > > per >> > > > > > > > > > > > > > > se, >> > > > > > > > > > > > > > > > but it's the same effect from the caller's >> > > > perspective. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Moreover, serde impls often block >> themselves, >> > > e.g. >> > > > when >> > > > > > > > > > fetching >> > > > > > > > > > > > > schemas >> > > > > > > > > > > > > > > > from a registry. I suppose it's also >> possible >> > to >> > > > block >> > > > > > in >> > > > > > > > > > > > > Interceptors >> > > > > > > > > > > > > > > > (e.g. writing audit events or metrics), >> which >> > > > happens >> > > > > > > > before >> > > > > > > > > > > serdes >> > > > > > > > > > > > > iiuc. >> > > > > > > > > > > > > > > > So any blocking in either of those plugins >> > would >> > > > block >> > > > > > > the >> > > > > > > > > send >> > > > > > > > > > > > > unless we >> > > > > > > > > > > > > > > > queue first. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > So I think we want to queue first and do >> > > everything >> > > > > > > > > off-thread >> > > > > > > > > > > when >> > > > > > > > > > > > > using >> > > > > > > > > > > > > > > > the new API, whatever that looks like. I >> just >> > > want >> > > > to >> > > > > > > make >> > > > > > > > > sure >> > > > > > > > > > > we >> > > > > > > > > > > > > don't >> > > > > > > > > > > > > > > do >> > > > > > > > > > > > > > > > that for clients that wouldn't expect it. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Another consideration is exception >> handling. If >> > > we >> > > > > > queue >> > > > > > > > > right >> > > > > > > > > > > > away, >> > > > > > > > > > > > > > > we'll >> > > > > > > > > > > > > > > > defer some exceptions that currently are >> thrown >> > > to >> > > > the >> > > > > > > > caller >> > > > > > > > > > > > > (before the >> > > > > > > > > > > > > > > > future is returned). In the new API, the >> send() >> > > > > > wouldn't >> > > > > > > > > throw >> > > > > > > > > > > any >> > > > > > > > > > > > > > > > exceptions, and instead the future would >> fail. >> > I >> > > > think >> > > > > > > that >> > > > > > > > > > might >> > > > > > > > > > > > > mean >> > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > a new method signature is required. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Ryanne >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Thu, May 13, 2021, 2:57 PM Nakamura < >> > > > > > > > > > nakamura.mo...@gmail.com >> > > > > > > > > > > > >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Hey Ryanne, >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > I agree we should add an additional >> > constructor >> > > > (or >> > > > > > > else >> > > > > > > > an >> > > > > > > > > > > > > additional >> > > > > > > > > > > > > > > > > overload in KafkaProducer#send, but the >> new >> > > > > > constructor >> > > > > > > > > would >> > > > > > > > > > > be >> > > > > > > > > > > > > easier >> > > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > understand) if we're targeting the "user >> > > > provides the >> > > > > > > > > thread" >> > > > > > > > > > > > > approach. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > From looking at the code, I think we can >> keep >> > > > record >> > > > > > > > > > > > serialization >> > > > > > > > > > > > > on >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > user thread, if we consider that an >> important >> > > > part of >> > > > > > > the >> > > > > > > > > > > > > semantics of >> > > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > method. It doesn't seem like >> serialization >> > > > depends >> > > > > > on >> > > > > > > > > > knowing >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > > cluster, >> > > > > > > > > > > > > > > > > I think it's incidental that it comes >> after >> > the >> > > > first >> > > > > > > > > > > "blocking" >> > > > > > > > > > > > > > > segment >> > > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > the method. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > Moses >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 2:38 PM Ryanne >> Dolan >> > < >> > > > > > > > > > > > > ryannedo...@gmail.com> >> > > > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Hey Moses, I like the direction here. My >> > > > thinking >> > > > > > is >> > > > > > > > > that a >> > > > > > > > > > > > > single >> > > > > > > > > > > > > > > > > > additional work queue, s.t. send() can >> > > enqueue >> > > > and >> > > > > > > > > return, >> > > > > > > > > > > > seems >> > > > > > > > > > > > > like >> > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > lightest touch. However, I don't think >> we >> > can >> > > > > > > trivially >> > > > > > > > > > > process >> > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > queue >> > > > > > > > > > > > > > > > > > in an internal thread pool without >> subtly >> > > > changing >> > > > > > > > > behavior >> > > > > > > > > > > for >> > > > > > > > > > > > > some >> > > > > > > > > > > > > > > > > users. >> > > > > > > > > > > > > > > > > > For example, users will often run >> send() in >> > > > > > multiple >> > > > > > > > > > threads >> > > > > > > > > > > in >> > > > > > > > > > > > > order >> > > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > serialize faster, but that wouldn't work >> > > quite >> > > > the >> > > > > > > same >> > > > > > > > > if >> > > > > > > > > > > > there >> > > > > > > > > > > > > were >> > > > > > > > > > > > > > > > an >> > > > > > > > > > > > > > > > > > internal thread pool. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > For this reason I'm thinking we need to >> > make >> > > > sure >> > > > > > any >> > > > > > > > > such >> > > > > > > > > > > > > changes >> > > > > > > > > > > > > > > are >> > > > > > > > > > > > > > > > > > opt-in. Maybe a new constructor with an >> > > > additional >> > > > > > > > > > > > ThreadFactory >> > > > > > > > > > > > > > > > > parameter. >> > > > > > > > > > > > > > > > > > That would at least clearly indicate >> that >> > > work >> > > > will >> > > > > > > > > happen >> > > > > > > > > > > > > > > off-thread, >> > > > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > would require opt-in for the new >> behavior. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Under the hood, this ThreadFactory >> could be >> > > > used to >> > > > > > > > > create >> > > > > > > > > > > the >> > > > > > > > > > > > > worker >> > > > > > > > > > > > > > > > > > thread that process queued sends, which >> > could >> > > > > > fan-out >> > > > > > > > to >> > > > > > > > > > > > > > > per-partition >> > > > > > > > > > > > > > > > > > threads from there. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > So then you'd have two ways to send: the >> > > > existing >> > > > > > > way, >> > > > > > > > > > where >> > > > > > > > > > > > > serde >> > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > interceptors and whatnot are executed on >> > the >> > > > > > calling >> > > > > > > > > > thread, >> > > > > > > > > > > > and >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > new >> > > > > > > > > > > > > > > > > > way, which returns right away and uses >> an >> > > > internal >> > > > > > > > > > Executor. >> > > > > > > > > > > As >> > > > > > > > > > > > > you >> > > > > > > > > > > > > > > > point >> > > > > > > > > > > > > > > > > > out, the semantics would be identical in >> > > either >> > > > > > case, >> > > > > > > > and >> > > > > > > > > > it >> > > > > > > > > > > > > would be >> > > > > > > > > > > > > > > > > very >> > > > > > > > > > > > > > > > > > easy for clients to switch. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Ryanne >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 9:00 AM Nakamura < >> > > > > > > > nny...@gmail.com >> > > > > > > > > > >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Hey Folks, >> > > > > > > > > > > > > > > > > > > I just posted a new proposal >> > > > > > > > > > > > > > > > > > > < >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > in the wiki. I think we have an >> > > opportunity >> > > > to >> > > > > > > > improve >> > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > KafkaProducer#send user experience. >> It >> > > would >> > > > > > > > certainly >> > > > > > > > > > > make >> > > > > > > > > > > > > our >> > > > > > > > > > > > > > > > lives >> > > > > > > > > > > > > > > > > > > easier. Please take a look! There >> are >> > two >> > > > > > > > subproblems >> > > > > > > > > > > that >> > > > > > > > > > > > I >> > > > > > > > > > > > > > > could >> > > > > > > > > > > > > > > > > use >> > > > > > > > > > > > > > > > > > > guidance on, so I would appreciate >> > feedback >> > > > on >> > > > > > both >> > > > > > > > of >> > > > > > > > > > > them. >> > > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > Moses >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > -- >> > > > > > > > >> > > > > > > > Matthew de Detrich >> > > > > > > > >> > > > > > > > *Aiven Deutschland GmbH* >> > > > > > > > >> > > > > > > > Immanuelkirchstraße 26, 10405 Berlin >> > > > > > > > >> > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B >> > > > > > > > >> > > > > > > > *m:* +491603708037 >> > > > > > > > >> > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > >> > > > > > Matthew de Detrich >> > > > > > >> > > > > > *Aiven Deutschland GmbH* >> > > > > > >> > > > > > Immanuelkirchstraße 26, 10405 Berlin >> > > > > > >> > > > > > Amtsgericht Charlottenburg, HRB 209739 B >> > > > > > >> > > > > > *m:* +491603708037 >> > > > > > >> > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io >> > > > > > >> > > > > >> > > > >> > > >> > > >> > > -- >> > > >> > > Matthew de Detrich >> > > >> > > *Aiven Deutschland GmbH* >> > > >> > > Immanuelkirchstraße 26, 10405 Berlin >> > > >> > > Amtsgericht Charlottenburg, HRB 209739 B >> > > >> > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen >> > > >> > > *m:* +491603708037 >> > > >> > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io >> > > >> > >> >> >> -- >> >> Matthew de Detrich >> >> *Aiven Deutschland GmbH* >> >> Immanuelkirchstraße 26, 10405 Berlin >> >> Amtsgericht Charlottenburg, HRB 209739 B >> >> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen >> >> *m:* +491603708037 >> >> *w:* aiven.io *e:* matthew.dedetr...@aiven.io >> > > > -- > > Josep Prat > > *Aiven Deutschland GmbH* > > Immanuelkirchstraße 26, 10405 Berlin > > Amtsgericht Charlottenburg, HRB 209739 B > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > > *m:* +491715557497 > > *w:* aiven.io > > *e:* josep.p...@aiven.io >