Hi, Michael > In this case, the consumer does not have the source of truth for the > readPosition. It would leave the new protocol field for `readPosition` > empty and the broker would use its source of truth for the read > position. application has received all the messages by application thread. we also need a correct `startPosition`, right? but in your way, we will think about the consumer hasn't received any messages.
> > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the > > same logic? it's a bad code. > > We don't need to synchronize this code here because the logic will > come after the consumer has been disconnected from broker a and before > it is connected to broker b. The application takes a message from the queue then reconnect, the SubCommond can use the right startPostion? example: 1. application receives one message with `MessageId = 1` 2. consumer reconnect discovers the queue is empty, and the lastDequeMessageId doesn't change. 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1` will redeliver from broker to client consumer, right? As we can see in the example, the application also can receive `MessageId = 1`, right? > We would not need to lock here because we do not enqueue new messages > after we've been disconnected from the broker and before we've sent > CommandSubscribe. we can see the code [0], the thread has changed. Where do we guarantee that no new messages will come in? > > Ultimately, I think a protocol solution will yield better results, > especially since we'll want to implement this feature in the other > client languages. The problem of the resetting cursor can be optimized in the future, but can you ensure the correctness of all the cases I mentioned above? IMO, if we use my design, client change, we don't need the broker to make any changes. its simple and it's easy to implement. I can make sure it's completely correct, I can make sure it's completely correct. In your design, I currently do not see a closed-loop implementation that can achieve at least in the java client. Thanks, Bo > > Thanks, > Michael > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > Hi, Michael: > > > > Michael Marshall <mmarsh...@apache.org> 于2023年3月21日周二 23:17写道: > > > > > > > > One more point. Instead of keeping track of the latest message seen by > > > the application, the logic in my solution would actually just check > > > the last message in the `incomingMessages` queue (as in the most > > > recently added), and use that as the read position in the subscribe > > > command. If we made this change, we would have to change this code [0] > > > to not drop the `incomingMessages` queue. > > > > case 1: > > What we define the message that the application has seen? > > I think it is the[0], when the `incomingMessages` queue is empty, > > how do we get the correct `startPosition`? > > What I think we should lock the receive logic in [1] > > ``` > > synchronized (this) { > > message = incomingMessages.take(); > > messageProcessed(message); > > } > > ``` > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the > > same logic? it's a bad code. > > > > case 2: > > If we sub with `startMessageId`, we also should lock any enqueue > > logic, like [2] and > > check to consumer's current state > > ``` > > synchronized (this) { > > if (consumer.isConnected) { > > if (canEnqueueMessage(message) && incomingMessages.offer(message)) { > > // After we have enqueued the messages on > > `incomingMessages` queue, we cannot touch the message > > // instance anymore, since for pooled messages, this > > instance was possibly already been released > > // and recycled. > > INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize); > > getMemoryLimitController().ifPresent(limiter -> > > limiter.forceReserveMemory(messageSize)); > > updateAutoScaleReceiverQueueHint(); > > } > > } > > } > > ``` > > case 3: > > when we subcommand sends to broker with `startMessageId = 1`, then the > > broker push message > > has not yet entered `incommingQueue`, the application invokes > > redeliver. in this way, we don't > > filter messages are correct, right? > > > > These are some cases that I simply thought of, and there must be > > others that I haven't thought > > of. Are you sure we can handle these problems correctly? > > > > > The problem of "the consumer doesn't know" seems like something that > > > is reasonably within the protocol's responsibilities. In this case, an > > > event happens on the broker, and the broker can tell the consumer. > > > > I don't think a simple change protocol can solve these problems, > > We can't promise that every consumer can receive the broker reset > > cursor request. > > When the consumer reconnects, the broker can't send the reset cursor > > request to > > the client consumers, right? In this case, the consumer is still unaware, > > right? > > > > > > [0] > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135 > > [1] > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454 > > [2] > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892 > > > > > > Thanks, > > > Michael > > > > > > [0] > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795 > > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mmarsh...@apache.org> > > > wrote: > > > > > > > > > if we add the new field in CommandSubscribe, we should ensure > > > > > the synchronization between consumer reconnection and user > > > > > calling receive and redeliverUnack method. it will affect the > > > > > performance > > > > > of receive. expose synchronization to hot paths it not a good idea. > > > > > > > > I don't think this is a valid objection. I am pretty sure we already > > > > synchronize in the relevant places in the consumer to solve the exact > > > > race condition you're concerned about: [0] [1]. > > > > > > > > My proposed operation is to keep track of the latest message id that > > > > the application has seen, and then tell the broker that id when > > > > sending the Subscribe command. We already do similar logic here [2] > > > > [3], but instead of getting the first message id the consumer hasn't > > > > seen, we'll get the latest message id seen. > > > > > > > > Regarding performance, the PIP doesn't touch on how it will filter out > > > > messages. What is the planned approach? In my understanding, the > > > > client will keep track of the latest message id that the application > > > > has seen and then will need to compare that message id against every > > > > new mess. As such, it seems like telling the broker where to start > > > > instead of naively checking a filter on every message would be > > > > cheaper. > > > > > > > > > As described in Compatibility in PIP. Client consumer doesn't know > > > > > Pulsar Admin reset cursor. > > > > > > > > The problem of "the consumer doesn't know" seems like something that > > > > is reasonably within the protocol's responsibilities. In this case, an > > > > event happens on the broker, and the broker can tell the consumer. > > > > > > > > > * <p>Consumers should close when the server resets the cursor, > > > > > * when the cursor reset success, and then restart. Otherwise, > > > > > * the consumer will not receive the history messages. > > > > > > > > This is introducing a confusing edge case that requires reading a > > > > Javadoc in order to understand. That seems risky to me, and I do not > > > > think we should add such an edge case. A new protocol message would > > > > easily handle it and make it transparent to the application. > > > > > > > > Thanks, > > > > Michael > > > > > > > > [0] > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912 > > > > [1] > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876 > > > > [2] > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795 > > > > [3] > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960 > > > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng > > > > <yubiao.f...@streamnative.io.invalid> wrote: > > > > > > > > > > +1 > > > > > > > > > > Hi, Bo : > > > > > > > > > > Thanks for your explanation. That makes sense to me. > > > > > > > > > > Thanks, > > > > > Yubiao Feng > > > > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > > > > > > > > Hi, pulsar community: > > > > > > > > > > > > I started a PIP about `Client consumer filter received messages`. > > > > > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864 > > > > > > > > > > > > Thanks, > > > > > > Bo > > > > > >