Hi, Michael:

Michael Marshall <mmarsh...@apache.org> 于2023年3月21日周二 23:17写道:

>
> One more point. Instead of keeping track of the latest message seen by
> the application, the logic in my solution would actually just check
> the last message in the `incomingMessages` queue (as in the most
> recently added), and use that as the read position in the subscribe
> command. If we made this change, we would have to change this code [0]
> to not drop the `incomingMessages` queue.

case 1:
What we define the message that the application has seen?
I think it is the[0], when the `incomingMessages` queue is empty,
how do we get the correct `startPosition`?
What I think we should lock the receive logic in [1]
```
synchronized (this) {
    message = incomingMessages.take();
    messageProcessed(message);
}
```
why do we need to invoke `BlockingQueue.take` and `synchronized` in the
same logic? it's a bad code.

case 2:
If we sub with `startMessageId`, we also should lock any enqueue
logic, like [2] and
check to consumer's current state
```
synchronized (this) {
    if (consumer.isConnected) {
        if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
            // After we have enqueued the messages on
`incomingMessages` queue, we cannot touch the message
            // instance anymore, since for pooled messages, this
instance was possibly already been released
            // and recycled.
            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
            getMemoryLimitController().ifPresent(limiter ->
limiter.forceReserveMemory(messageSize));
            updateAutoScaleReceiverQueueHint();
        }
    }
}
```
case 3:
when we subcommand sends to broker with `startMessageId = 1`, then the
broker push message
has not yet entered `incommingQueue`, the application invokes
redeliver. in this way, we don't
filter messages are correct, right?

These are some cases that I simply thought of, and there must be
others that I haven't thought
of. Are you sure we can handle these problems correctly?

> The problem of "the consumer doesn't know" seems like something that
> is reasonably within the protocol's responsibilities. In this case, an
> event happens on the broker, and the broker can tell the consumer.

I don't think a simple change protocol can solve these problems,
We can't promise that every consumer can receive the broker reset
cursor request.
When the consumer reconnects, the broker can't send the reset cursor request to
the client consumers, right? In this case, the consumer is still unaware, right?


[0] 
https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
[1] 
https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
[2] 
https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
>
> Thanks,
> Michael
>
> [0] 
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
>
> On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mmarsh...@apache.org> wrote:
> >
> > > if we add the new field in CommandSubscribe, we should ensure
> > > the synchronization between consumer reconnection and user
> > > calling receive and redeliverUnack method. it will affect the performance
> > > of receive. expose synchronization to hot paths it not a good idea.
> >
> > I don't think this is a valid objection. I am pretty sure we already
> > synchronize in the relevant places in the consumer to solve the exact
> > race condition you're concerned about: [0] [1].
> >
> > My proposed operation is to keep track of the latest message id that
> > the application has seen, and then tell the broker that id when
> > sending the Subscribe command. We already do similar logic here [2]
> > [3], but instead of getting the first message id the consumer hasn't
> > seen, we'll get the latest message id seen.
> >
> > Regarding performance, the PIP doesn't touch on how it will filter out
> > messages. What is the planned approach? In my understanding, the
> > client will keep track of the latest message id that the application
> > has seen and then will need to compare that message id against every
> > new mess. As such, it seems like telling the broker where to start
> > instead of naively checking a filter on every message would be
> > cheaper.
> >
> > > As described in Compatibility in PIP. Client consumer doesn't know
> > > Pulsar Admin reset cursor.
> >
> > The problem of "the consumer doesn't know" seems like something that
> > is reasonably within the protocol's responsibilities. In this case, an
> > event happens on the broker, and the broker can tell the consumer.
> >
> > > * <p>Consumers should close when the server resets the cursor,
> > > * when the cursor reset success, and then restart. Otherwise,
> > > * the consumer will not receive the history messages.
> >
> > This is introducing a confusing edge case that requires reading a
> > Javadoc in order to understand. That seems risky to me, and I do not
> > think we should add such an edge case. A new protocol message would
> > easily handle it and make it transparent to the application.
> >
> > Thanks,
> > Michael
> >
> > [0] 
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > [1] 
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > [2] 
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > [3] 
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> >
> > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > <yubiao.f...@streamnative.io.invalid> wrote:
> > >
> > > +1
> > >
> > > Hi, Bo :
> > >
> > > Thanks for your explanation. That makes sense to me.
> > >
> > > Thanks,
> > > Yubiao Feng
> > >
> > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <congbobo...@gmail.com> wrote:
> > >
> > > > Hi, pulsar community:
> > > >
> > > > I started a PIP about `Client consumer filter received messages`.
> > > >
> > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > >
> > > > Thanks,
> > > > Bo
> > > >

Reply via email to