Hey Erik,

Sorry for holding up this email for a few days since Colin's response
includes some of my concerns.  I'm in favor of this KIP, and I think your
approach seems safe.  Of course, I probably missed something therefore I
think this KIP needs to cover different use cases to demonstrate it doesn't
cause any unsafe access. I think this can be demonstrated via diagrams and
some code in the KIP.

Thanks,
P

On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten
<e.vanoos...@grons.nl.invalid> wrote:

> Hello Colin,
>
>  >> In KIP-944, the callback thread can only delegate to another thread
> after reading from and writing to a threadlocal variable, providing the
> barriers right there.
>
>  > I don't see any documentation that accessing thread local variables
> provides a total store or load barrier. Do you have such documentation?
> It seems like if this were the case, we could eliminate volatile
> variables from most of the code base.
>
> Now I was imprecise. The thread-locals are only somewhat involved. In
> the KIP proposal the callback thread reads an access key from a
> thread-local variable. It then needs to pass that access key to another
> thread, which then can set it on its own thread-local variable. The act
> of passing a value from one thread to another implies that a memory
> barrier needs to be passed. However, this is all not so relevant since
> there is no need to pass the access key back when the other thread is done.
>
> But now I think about it a bit more, the locking mechanism runs in a
> synchronized block. If I remember correctly this should be enough to
> pass read and write barriers.
>
>  >> In the current implementation the consumer is also invoked from
> random threads. If it works now, it should continue to work.
>  > I'm not sure what you're referring to. Can you expand on this?
>
> Any invocation of the consumer (e.g. method poll) is not from a thread
> managed by the consumer. This is what I was assuming you meant with the
> term 'random thread'.
>
>  > Hmm, not sure what you mean by "cooperate with blocking code." If you
> have 10 green threads you're multiplexing on to one CPU thread, and that
> CPU thread gets blocked because of what one green thread is doing, the
> other 9 green threads are blocked too, right? I guess it's "just" a
> performance problem, but it still seems like it could be a serious one.
>
> There are several ways to deal with this. All async runtimes I know
> (Akka, Zio, Cats-effects) support this by letting you mark a task as
> blocking. The runtime will then either schedule it to another
> thread-pool, or it will grow the thread-pool to accommodate. In any case
> 'the other 9 green threads' will simply be scheduled to another real
> thread. In addition, some of these runtimes detect long running tasks
> and will reschedule waiting tasks to another thread. This is all a bit
> off topic though.
>
>  > I don't see why this has to be "inherently multi-threaded." Why can't
> we have the other threads report back what messages they've processed to
> the worker thread. Then it will be able to handle these callbacks
> without involving the other threads.
>
> Please consider the context which is that we are running inside the
> callback of the rebalance listener. The only way to execute something
> and also have a timeout on it is to run the something on another thread.
>
> Kind regards,
>      Erik.
>
>
> Op 08-07-2023 om 19:17 schreef Colin McCabe:
> > On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote:
> >> Hi Colin,
> >>
> >> Thanks for your thoughts and taking the time to reply.
> >>
> >> Let me take away your concerns. None of your worries are an issue with
> >> the algorithm described in KIP-944. Here it goes:
> >>
> >>   > It's not clear ot me that it's safe to access the Kafka consumer or
> >>> producer concurrently from different threads.
> >> Concurrent access is /not/ a design goal of KIP-944. In fact, it goes
> >> through great lengths to make sure that this cannot happen.
> >>
> >> *The only design goal is to allow callbacks to call the consumer from
> >> another thread.*
> >>
> >> To make sure there are no more misunderstandings about this, I have
> >> added this goal to the KIP.
> >>
> > Hi Erik,
> >
> > Sorry, I spoke imprecisely. My concern is not concurrent access, but
> multithreaded access in general. Basically cache line visibility issues.
> >
> >>   > This is true even if the accesses happen at different times, because
> >>> modern CPUs require memory barriers to guarantee inter-thread visibilty
> >>> of loads and stores.
> >> In KIP-944, the callback thread can only delegate to another thread
> >> after reading from and writing to a threadlocal variable, providing the
> >> barriers right there.
> >>
> > I don't see any documentation that accessing thread local variables
> provides a total store or load barrier. Do you have such documentation? It
> seems like if this were the case, we could eliminate volatile variables
> from most of the code base.
> >
> >>   > I know that there are at least a few locks in the consumer code now,
> >>> due to our need to send heartbeats from a worker thread. I don't think
> >>> those would be sufficient to protect a client that is making calls from
> >>> random threads.
> >> In the current implementation the consumer is also invoked from random
> >> threads. If it works now, it should continue to work.
> >>
> > I'm not sure what you're referring to. Can you expand on this?
> >
> >>   > There has been some discussion of moving to a more traditional model
> >>> where people make calls to the client and the clients passes the given
> >>> data to a single background worker thread. This would avoid a lot lof
> >>> the footguns of the current model and probably better reflect how
> people
> >>> actually use the client.
> >> That is awesome. However, I'd rather not wait for that.
> >>
> >>   > Another issue is that neither the producer nor the consumer is fully
> >>> nonblocking. There are some corner cases where we do in fact block.
> From
> >>> memory, the producer blocks in some "buffer full" cases, and the
> >>> consumer blocks sometimes when fetching metadata.
> >> I am aware of that. This is not an issue; all async runtimes can
> >> cooperate with blocking code.
> >>
> > Hmm, not sure what you mean by "cooperate with blocking code." If you
> have 10 green threads you're multiplexing on to one CPU thread, and that
> CPU thread gets blocked because of what one green thread is doing, the
> other 9 green threads are blocked too, right? I guess it's "just" a
> performance problem, but it still seems like it could be a serious one.
> >
> >>   > I suspect it would be more appropriate for Kotlin coroutines, Zio
> >>> coroutines and so on to adopt this "pass messages to and from a
> >>> background worker thread" model than to try to re-engineer the Kafka
> >>> client ot work from random threads.
> >> In both zio-kafka and fs2-kafka this is already the approach we are
> taking.
> >>
> >> Unfortunately, the Kafka consumer forces us to perform some work in
> >> callbacks:
> >>
> >>    * commit completed callback: register that the callback is complete,
> >>    * partition revoked callback: in this callback we need to submit
> >>      commits from everything consumed and processed so far, using
> >>      timeouts if processing takes to long. In an async runtime, this is
> >>      an inherently multi-threaded process. Especially, we cannot do
> >>      timeouts without involving multiple threads.
> >>
> > I don't see why this has to be "inherently multi-threaded." Why can't we
> have the other threads report back what messages they've processed to the
> worker thread. Then it will be able to handle these callbacks without
> involving the other threads.
> >
> > regards,
> > Colin
> >
> >> I have extended the KIP's motivation to explain the major use case.
> >>
> >> Please read KIP-944 again. Even though the description is extensive
> >> (this callback from callback stuff is tricky), you will find that my
> >> goals are modest.
> >>
> >> Also the implementation is just a few lines. With understanding of the
> >> idea it should not be a lot of work to follow it.
> >>
> >> Kind regards,
> >>       Erik.
> >>
> >>
> >> Op 07-07-2023 om 19:57 schreef Colin McCabe:
> >>> Hi Erik,
> >>>
> >>> It's not clear ot me that it's safe to access the Kafka consumer or
> producer concurrently from different threads. There are data structures
> that aren't protected by locks, so I wouldn't necessarily expect accessing
> and mutating them in a concurrent way to work. This is true even if the
> accesses happen at different times, because modern CPUs require memory
> barriers to guarantee inter-thread visibilty of loads and stores.
> >>>
> >>> I am writing this is without doing a detailed dive into the code (I
> haven't been into the consumer / producer code in a bit.) Someone who has
> worked more on the consumer recently might be able to give specific
> examples of things that wouldn't work.
> >>>
> >>> I know that there are at least a few locks in the consumer code now,
> due to our need to send heartbeats from a worker thread. I don't think
> those would be sufficient to protect a client that is making calls from
> random threads.
> >>>
> >>> There has been some discussion of moving to a more traditional model
> where people make calls to the client and the clients passes the given data
> to a single background worker thread. This would avoid a lot lof the
> footguns of the current model and probably better reflect how people
> actually use the client.
> >>>
> >>> Another issue is that neither the producer nor the consumer is fully
> nonblocking. There are some corner cases where we do in fact block. From
> memory, the producer blocks in some "buffer full" cases, and the consumer
> blocks sometimes when fetching metadata.
> >>>
> >>> I suspect it would be more appropriate for Kotlin coroutines, Zio
> coroutines and so on to adopt this "pass messages to and from a background
> worker thread" model  than to try to re-engineer the Kafka client ot work
> from random threads.
> >>>
> >>> There is actually somed good  advice about how to handle multiple
> threads in the KafkaConsumer.java header file itself. Check the sections
> "One Consumer Per Thread" and "Decouple Consumption and Processing." What
> I'm recommending here is essentially the latter.
> >>>
> >>> I do understand that it's frustrating to not get a quick response.
> However, overall I think this one needs a lot more discussion before
> getting anywhere near a vote. I will leave a -1 just as a procedural step.
> Maybe some of the people working in the client area can also chime in.
> >>>
> >>> best,
> >>> Colin
> >>>
> >>>
> >>> On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote:
> >>>> Dear PMCs,
> >>>>
> >>>> So far there have been 0 responses to KIP-944. I understand this may
> not
> >>>> be something that keeps you busy, but this KIP is important to people
> >>>> that use async runtimes like Zio, Cats and Kotlin.
> >>>>
> >>>> Is there anything you need to come to a decision?
> >>>>
> >>>> Kind regards,
> >>>>        Erik.
> >>>>
> >>>>
> >>>> Op 05-07-2023 om 11:38 schreef Erik van Oosten:
> >>>>> Hello all,
> >>>>>
> >>>>> I'd like to call a vote on KIP-944 Support async runtimes in
> consumer.
> >>>>> It has has been 'under discussion' for 7 days now. 'Under discussion'
> >>>>> between quotes, because there were 0 comments so far. I hope the KIP
> >>>>> is clear!
> >>>>>
> >>>>> KIP description:https://cwiki.apache.org/confluence/x/chw0Dw
> >>>>>
> >>>>> Kind regards,
> >>>>>       Erik.
> >>>>>
> >>>>>
> >> --
> >> Erik van Oosten
> >> e.vanoos...@grons.nl
> >> https://day-to-day-stuff.blogspot.com
>
> --
> Erik van Oosten
> e.vanoos...@grons.nl
> https://day-to-day-stuff.blogspot.com
>
>

Reply via email to