Did you end up fixing the PIP? I see it wasn’t implemented at the end.

On 11 Sep 2022 at 7:18:07, Qiang Huang <qiang.huang1...@gmail.com> wrote:

> Hi Asaf, thank you for the very detailed reply.
>
> The problem we have today is that while we have sent a request to reset
>
> the subscription position, the broker decides to:
>
> 1. Close the TCP connection which in turn causes the client to clear any
>
> pending messages it has in the queue.
>
> 2. Continue to send messages from the previous position, up to a certain
>
> point where the broker "shifts gear" and starts sending messages from the
> new position.
>
> Since Pulsar doesn't follow a request-response model but has a
>
> bi-directional protocol, the client can send a command to fetch messages
> using a new session
>
> sequence number, while the server can still send messages using the old
>
> session number. Using the Session Sequence Number the client can't tell the
>
> difference between the messages being pushed from the server to it.
>
> I totally agree with you. I am aware of something wrong in the pip when I
> re-read this part of the code.
>
>
> # What are the issues with this PIP?
>
> 1. The PIP decides to solve the problem listed above *only* for exclusive
>
> and failover subscriptions where you have only a single consumer. The
>
> problem still remains at large with Shared or Key Shared subscriptions.
>
> 2. The cost of solving a small portion of the problem is high:
>
>     Added Complexity - Adding another field to the protocol, and another
>
> thing to check. I believe we should aim to reduce the cognitive load of
>
> the
>
> developers of Pulsar.
>
> 3. There are no rejected solutions - We always need to examine all
>
> available options and list why we decided against them.
>
> 4. Lack of background knowledge (context) - it's super hard IMO to grasp
>
> the idea without so much context missing: The client-server protocol
>
> pertaining to this PIP, including its async nature, what is an epoch and
>
> why it was introduced, what are flow permits. I'm not saying explain all
>
> pulsar in this doc, but just include a brief explanation of that
>
> terminology.
>
>
> # What We Suggest
>
>
> Rethink the solution.
>
> 1. The consumer (one of many) will send a seek command to the broker, and
>
> at the same time clear its internal queue and wait for a response from the
>
> broker.
>
> 2. The broker upon receiving the seek command, will
>
>      a. Stop sending dispatching messages to consumers.
>
>      b. Notify all consumers via a command (new) that the subscription
>
> position was asked to be reset. Consumers receiving this command will
>
> clear
>
> their internal queue. The broker will no longer close the TCP connection
>
> (with its adverse effects on other consumers and produces "riding" on that
>
> connection)
>
>      c. Reset the cursor to the newly requested position.
>
>      d. Continue dispatching messages from newly requested positions to
>
> consumers.
>
>
>
> Good suggestions. I'll look into these issues and rethink the solution. I
> will rewrite this pip according to your suggestions.
> Thanks again for your review.
>
> Asaf Mesika <asaf.mes...@gmail.com> 于2022年9月7日周三 23:12写道:
>
> Hi Qiang,
>
>
> We have a brainstorming session on this PIP over Zoom with Penghui, Hang,
>
> and more people, and I'm jotting down here our feedback.
>
>
> Before I do that, I just want to write my own understanding of the
>
> document, for other readers:
>
>
> # Context
>
> Pulsar, as opposed to other distributed / streaming systems, took the
>
> approach of a push model. The client (consumer that is) asks for 1000
>
> messages (that's the consumer's remaining capacity in its internal queue)
>
> from the broker (that process is named flow permits). The broker was now
>
> given permission to send 1000 messages to the client, hence utilizing the
>
> TCP connection to send those 1000 messages as they were ready to be sent.
>
>
> The consumer has the ability to request the subscription to reset its
>
> position to the requested new position.
>
> The problem we have today is that while we have sent a request to reset the
>
> subscription position, the broker decides to:
>
> 1. Close the TCP connection which in turn causes the client to clear any
>
> pending messages it has in the queue.
>
> 2. Continue to send messages from the previous position, up to a certain
>
> point where the broker "shifts gear" and starts sending messages from the
>
> new position.
>
>
> So the problem is that you would expect that after the connection was
>
> reset, only messages from the new position will be sent to the consumer,
>
> but that doesn't happen.
>
>
> We have to keep in mind, that we have effectively two scenarios here from
>
> the point of view of the consumer:
>
> 1. Single consumer - It can be due to using an Exclusive subscription, or
>
> being a consumer attached to a single topic since the subscription is of
>
> type Failover.
>
> 2. Multiple consumers - In a Shared or Key Shared subscription types. In
>
> this case, one of those consumers can decide to reset the position of the
>
> *subscription*. When that happens, the broker decides, again, to reset all
>
> existing TCP connections to all consumers upon receiving the seek command,
>
> and you would expect any messages sent afterward to be from the new
>
> position, which again doesn't happen.
>
>
> Another really important piece of information we need to bring to the
>
> context of the reader here is the notion of an epoch. First, the epoch in
>
> Pulsar PIPs was introduced in PIP-84. The idea is that every time the
>
> client starts a "session" of requesting and receiving messages in response,
>
> the client will send a Session Sequence Number, and the server responds to
>
> those message requests with the same session sequence number. Since Pulsar
>
> doesn't follow a request-response model but has a bi-directional protocol,
>
> the client can send a command to fetch messages using a new session
>
> sequence number, while the server can still send messages using the old
>
> session number. Using the Session Sequence Number the client can't tell the
>
> difference between the messages being pushed from the server to it. That
>
> Session Sequence Number has the one referred to as Epoch in PIP-84 and also
>
> in this PIP.
>
> The idea was somehow to demarcate the responses coming from the server
>
> based on the commands the client sends as they are *independent* (async).
>
>
> # What are the issues with this PIP?
>
> 1. The PIP decides to solve the problem listed above *only* for exclusive
>
> and failover subscriptions where you have only a single consumer. The
>
> problem still remains at large with Shared or Key Shared subscriptions.
>
> 2. The cost of solving a small portion of the problem is high:
>
>     Added Complexity - Adding another field to the protocol, and another
>
> thing to check. I believe we should aim to reduce the cognitive load of the
>
> developers of Pulsar.
>
> 3. There are no rejected solutions - We always need to examine all
>
> available options and list why we decided against them.
>
> 4. Lack of background knowledge (context) - it's super hard IMO to grasp
>
> the idea without so much context missing: The client-server protocol
>
> pertaining to this PIP, including its async nature, what is an epoch and
>
> why it was introduced, what are flow permits. I'm not saying explain all
>
> pulsar in this doc, but just include a brief explanation of that
>
> terminology.
>
>
> # What We Suggest
>
>
> Rethink the solution.
>
> 1. The consumer (one of many) will send a seek command to the broker, and
>
> at the same time clear its internal queue and wait for a response from the
>
> broker.
>
> 2. The broker upon receiving the seek command, will
>
>      a. Stop sending dispatching messages to consumers.
>
>      b. Notify all consumers via a command (new) that the subscription
>
> position was asked to be reset. Consumers receiving this command will clear
>
> their internal queue. The broker will no longer close the TCP connection
>
> (with its adverse effects on other consumers and produces "riding" on that
>
> connection)
>
>      c. Reset the cursor to the newly requested position.
>
>      d. Continue dispatching messages from newly requested positions to
>
> consumers.
>
>
> The disadvantages here are that we need to alter the client to get to know
>
> a new command and act accordingly, yet I think that is accidental
>
> complexity stemming from the client-server architecture of bi-directional
>
> and not request response.
>
>
> Thanks,
>
>
> Asaf
>
>
> On Mon, Aug 1, 2022 at 6:43 AM Qiang Huang <qiang.huang1...@gmail.com>
>
> wrote:
>
>
> > Sure. You can refer to pip-84:
>
> >
>
> >
>
>
> https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch
>
> > .
>
> >
>
> > Zike Yang <z...@apache.org> 于2022年7月29日周五 10:22写道:
>
> >
>
> > > Hi, Qiang
>
> > >
>
> > > > It is necessary to check the current cursor status when handling
>
> > > flowPermits
>
> > > > request from the server side. If the server is handling seek request,
>
> > it
>
> > > > should ignore flowPermits request because the request is illegal.
>
> > >
>
> > > Thanks for your explanation. I think it's better to add this
>
> > > explanation to the PIP.
>
> > >
>
> > > > The reconnected consumer can regard as a new consumer with new epoch.
>
> > >
>
> > > The consumer will reconnect to the broker during the seek operation.
>
> > > And this will change the existing behavior. It doesn't seem to make
>
> > > sense. Please correct me if I have misunderstood.
>
> > >
>
> > > Thanks,
>
> > > Zike Yang
>
> > >
>
> > > On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qiang.huang1...@gmail.com
>
> >
>
> > > wrote:
>
> > > >
>
> > > > Thanks Zike.
>
> > > > > > - stage 1: Check the current cursor status when handling
>
> > flowPermits
>
> > > > from
>
> > > > > > the server side.
>
> > > >
>
> > > > > > Could you explain more details on this step? It looks like there
>
> is
>
> > > > not much described above. What kind of status needs to be checked,
>
> and
>
> > > > what kind of behavior will the broker take?
>
> > > > It is necessary to check the current cursor status when handling
>
> > > flowPermits
>
> > > > request from the server side. If the server is handling seek request,
>
> > it
>
> > > > should ignore flowPermits request because the request is illegal.
>
> > > >
>
> > > >
>
> > > > > > 1. Consumer reconnect need reset epoch.
>
> > > > >> Why do we need to reset the epoch when the consumer reconnects?
>
> > > > The reconnected consumer can regard as a new consumer with new epoch.
>
> > >
>
> >
>
> >
>
> > --
>
> > BR,
>
> > Qiang Huang
>
> >
>
>
>
>
> --
> BR,
> Qiang Huang
>

Reply via email to