Hi, Yunze:

> It's better to describe how it could bring the benefit to transaction
> use cases, since now it's designed to be a configuration related to
> the transaction.
sorry, that I haven't explained in detail why the transaction needs it.
let's look at a simple example:

```
Transaction txn = getTxn();
int num = 0;
MessageId messageId = null;
while (num < 10) {
    messageId = consumer.receive(5, TimeUnit.SECONDS).getMessageId();
    producer.newMessage(txn).value(messageId.toString()).sendAsync();
    num++;
}
consumer.acknowledgeCumulativeAsync(messageId);
txn.commit();
```
This example mainly describes the atomicity of ack and produce of
10 messages by a transaction.
If the messages we receive are duplicates, the messages we
produce will also be duplicates. Therefore, we need to ensure that
the messages we receive will not be repeated and are ordered in
failover and exclusive subscription modes. But the client consumer
does not currently have this guarantee. And it must be exactly,
otherwise, it will break the exactly-once semantics


> With this proposal and the option enabled, all these cases will filter
> the messages. That's why I think we have to consider the case for
> resetting cursors because it makes things worse.

Yes, This configuration may make the reset cursor more
difficult to use, But without this configuration, it is difficult to guarantee
the correctness of the transaction. Although we made the reset
cursor worse, we ensured correctness.

For transaction, we must first consider its correctness, and secondly,
what features to support (reset cursor eg.)

Thanks,
Bo
>
> The three cases above do not involve transaction operations. So it
> would be better to understand the benefit if you can show some typical
> cases involved with transaction operations.
>
> Thanks,
> Yunze
>
> On Wed, Mar 29, 2023 at 12:02 PM 丛搏 <congbobo...@gmail.com> wrote:
> >
> > Hi, all :
> >
> > Thanks to everyone who discussed it.
> >
> > Our current care points include the following aspects:
> >
> > 1. The filtering efficiency of the client consumer is not as
> > good as doing something directly in startMessageId
> > 2. Does not support reset cursor
> >
> > Because my previous PIP description is to add configuration
> > in consumerBuilder. The definition of this configuration is not
> > clear, and it will cause great trouble to users.
> >
> > We can add a separate configuration that is only used for
> > acks with transactions. Simple example:
> >
> > ```
> > ConsumerBuilder<T> 
> > transactionConfiguration(ConsumerTransactionConfiguration);
> >
> > @Builder
> > @Data
> > @NoArgsConstructor
> > @AllArgsConstructor
> > @InterfaceAudience.Public
> > @InterfaceStability.Stable
> >
> > public class ConsumerTransactionConfiguration {
> >    boolean isFilterReceivedMessagesEnabled = false;
> > }
> >
> > ```
> >
> > if the design of startMessageId can provide the feature,
> > we can remove the configuration, or currently has a startMessageId
> > closed loop solution, I agree to use startMessageId.
> >
> > As for the reset cursor, I think it is another problem,
> > not related to this PIP.
> >
> > Thanks,
> > Bo
> >
> > 丛搏 <congbobo...@gmail.com> 于2023年3月24日周五 18:53写道:
> > >
> > > Hi, Michael:
> > >
> > > I thought about it carefully, and using 'startMessageId'
> > > is indeed a good idea. But it is more complicated, we
> > > need to ensure its absolute correctness, and take
> > > performance into consideration. If you can come up
> > >  with a closed-loop solution based on 'startMessageId',
> > > I support you. If it can't take into account performance
> > > and correctness, I think we will make a combination of
> > > our two solutions. You are responsible for ensuring that
> > > a certain degree of messages are not re-delivered, which
> > >  reduces the overhead caused by the repeated delivery
> > > of many messages. My design is responsible for
> > > the final consistency.
> > >
> > > Thanks,
> > > Bo
> > >
> > > Michael Marshall <mmarsh...@apache.org> 于2023年3月22日周三 14:22写道:
> > > >
> > > > Because we already send the `startMessageId`, there is a chance where
> > > > we might not even need to update the protocol for the
> > > > CommandSubscribe. In light of that, I quickly put together a PR
> > > > showing how that field might be used to inform the broker where to
> > > > start the read position for the cursor.
> > > >
> > > > https://github.com/apache/pulsar/pull/19892
> > > >
> > > > The PR is not complete, but it does convey the general idea. I wrote
> > > > additional details in the draft's description.
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > On Tue, Mar 21, 2023 at 11:31 PM Michael Marshall 
> > > > <mmarsh...@apache.org> wrote:
> > > > >
> > > > > I am not following your objections to the protocol solution. It might
> > > > > be more productive if I provided a draft PR with a sample
> > > > > implementation. I'm not sure that I'll have time, but I'll try to put
> > > > > something together this week.
> > > > >
> > > > > > At least it will simplify the process of using cumulative ack with 
> > > > > > the
> > > > > > transaction.
> > > > >
> > > > > Is this the underlying motivation for the PIP?
> > > > >
> > > > > From my perspective, the PIP is seeking to decrease duplicate messages
> > > > > experienced due to disconnections from the broker.
> > > > >
> > > > > > The problem of the resetting cursor can be optimized in the future
> > > > >
> > > > > Why should we push off solving this problem? It seems fundamental to
> > > > > this PIP and should not be ignored. At the very least, I think we need
> > > > > to have an idea of what the future solution would be before we defer
> > > > > its implementation.
> > > > >
> > > > > Thanks,
> > > > > Michael
> > > > >
> > > > >
> > > > > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <congbobo...@gmail.com> wrote:
> > > > > >
> > > > > > Hi, Michael
> > > > > > > In this case, the consumer does not have the source of truth for 
> > > > > > > the
> > > > > > > readPosition. It would leave the new protocol field for 
> > > > > > > `readPosition`
> > > > > > > empty and the broker would use its source of truth for the read
> > > > > > > position.
> > > > > > application has received all the messages by application thread. we 
> > > > > > also need a
> > > > > > correct `startPosition`, right? but in your way, we will think about
> > > > > > the consumer
> > > > > > hasn't received any messages.
> > > > > >
> > > > > > >
> > > > > > > > why do we need to invoke `BlockingQueue.take` and 
> > > > > > > > `synchronized` in the
> > > > > > > > same logic? it's a bad code.
> > > > > > >
> > > > > > > We don't need to synchronize this code here because the logic will
> > > > > > > come after the consumer has been disconnected from broker a and 
> > > > > > > before
> > > > > > > it is connected to broker b.
> > > > > > The application takes a message from the queue then reconnect,
> > > > > > the SubCommond can use the right startPostion? example:
> > > > > > 1. application receives one message with `MessageId = 1`
> > > > > > 2. consumer reconnect discovers the queue is empty, and the
> > > > > > lastDequeMessageId doesn't change.
> > > > > > 3. consumer sends a subcommand with MessageId.earliest, the 
> > > > > > `MessageId = 1`
> > > > > > will redeliver from broker to client consumer, right?
> > > > > >
> > > > > > As we can see in the example, the application also can receive
> > > > > > `MessageId = 1`, right?
> > > > > > > We would not need to lock here because we do not enqueue new 
> > > > > > > messages
> > > > > > > after we've been disconnected from the broker and before we've 
> > > > > > > sent
> > > > > > > CommandSubscribe.
> > > > > > we can see the code [0], the thread has changed.
> > > > > > Where do we guarantee that no new messages will come in?
> > > > > >
> > > > > > >
> > > > > > > Ultimately, I think a protocol solution will yield better results,
> > > > > > > especially since we'll want to implement this feature in the other
> > > > > > > client languages.
> > > > > > The problem of the resetting cursor can be optimized in the future,
> > > > > > but can you ensure the
> > > > > > correctness of all the cases I mentioned above? IMO, if we use my
> > > > > > design, client change,
> > > > > > we don't need the broker to make any changes. its simple and it's 
> > > > > > easy
> > > > > > to implement.
> > > > > > I can make sure it's completely correct, I can make sure it's
> > > > > > completely correct. In your design,
> > > > > > I currently do not see a closed-loop implementation that can achieve
> > > > > > at least in the java client.
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Michael
> > > > > > >
> > > > > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <congbobo...@gmail.com> wrote:
> > > > > > > >
> > > > > > > > Hi, Michael:
> > > > > > > >
> > > > > > > > Michael Marshall <mmarsh...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > > > > >
> > > > > > > > >
> > > > > > > > > One more point. Instead of keeping track of the latest 
> > > > > > > > > message seen by
> > > > > > > > > the application, the logic in my solution would actually just 
> > > > > > > > > check
> > > > > > > > > the last message in the `incomingMessages` queue (as in the 
> > > > > > > > > most
> > > > > > > > > recently added), and use that as the read position in the 
> > > > > > > > > subscribe
> > > > > > > > > command. If we made this change, we would have to change this 
> > > > > > > > > code [0]
> > > > > > > > > to not drop the `incomingMessages` queue.
> > > > > > > >
> > > > > > > > case 1:
> > > > > > > > What we define the message that the application has seen?
> > > > > > > > I think it is the[0], when the `incomingMessages` queue is 
> > > > > > > > empty,
> > > > > > > > how do we get the correct `startPosition`?
> > > > > > > > What I think we should lock the receive logic in [1]
> > > > > > > > ```
> > > > > > > > synchronized (this) {
> > > > > > > >     message = incomingMessages.take();
> > > > > > > >     messageProcessed(message);
> > > > > > > > }
> > > > > > > > ```
> > > > > > > > why do we need to invoke `BlockingQueue.take` and 
> > > > > > > > `synchronized` in the
> > > > > > > > same logic? it's a bad code.
> > > > > > > >
> > > > > > > > case 2:
> > > > > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > > > > logic, like [2] and
> > > > > > > > check to consumer's current state
> > > > > > > > ```
> > > > > > > > synchronized (this) {
> > > > > > > >     if (consumer.isConnected) {
> > > > > > > >         if (canEnqueueMessage(message) && 
> > > > > > > > incomingMessages.offer(message)) {
> > > > > > > >             // After we have enqueued the messages on
> > > > > > > > `incomingMessages` queue, we cannot touch the message
> > > > > > > >             // instance anymore, since for pooled messages, this
> > > > > > > > instance was possibly already been released
> > > > > > > >             // and recycled.
> > > > > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, 
> > > > > > > > messageSize);
> > > > > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > > > > limiter.forceReserveMemory(messageSize));
> > > > > > > >             updateAutoScaleReceiverQueueHint();
> > > > > > > >         }
> > > > > > > >     }
> > > > > > > > }
> > > > > > > > ```
> > > > > > > > case 3:
> > > > > > > > when we subcommand sends to broker with `startMessageId = 1`, 
> > > > > > > > then the
> > > > > > > > broker push message
> > > > > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > > > > redeliver. in this way, we don't
> > > > > > > > filter messages are correct, right?
> > > > > > > >
> > > > > > > > These are some cases that I simply thought of, and there must be
> > > > > > > > others that I haven't thought
> > > > > > > > of. Are you sure we can handle these problems correctly?
> > > > > > > >
> > > > > > > > > The problem of "the consumer doesn't know" seems like 
> > > > > > > > > something that
> > > > > > > > > is reasonably within the protocol's responsibilities. In this 
> > > > > > > > > case, an
> > > > > > > > > event happens on the broker, and the broker can tell the 
> > > > > > > > > consumer.
> > > > > > > >
> > > > > > > > I don't think a simple change protocol can solve these problems,
> > > > > > > > We can't promise that every consumer can receive the broker 
> > > > > > > > reset
> > > > > > > > cursor request.
> > > > > > > > When the consumer reconnects, the broker can't send the reset 
> > > > > > > > cursor request to
> > > > > > > > the client consumers, right? In this case, the consumer is 
> > > > > > > > still unaware, right?
> > > > > > > >
> > > > > > > >
> > > > > > > > [0] 
> > > > > > > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > > > > [1] 
> > > > > > > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > > > > [2] 
> > > > > > > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Michael
> > > > > > > > >
> > > > > > > > > [0] 
> > > > > > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > >
> > > > > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall 
> > > > > > > > > <mmarsh...@apache.org> wrote:
> > > > > > > > > >
> > > > > > > > > > > if we add the new field in CommandSubscribe, we should 
> > > > > > > > > > > ensure
> > > > > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > > > > calling receive and redeliverUnack method. it will affect 
> > > > > > > > > > > the performance
> > > > > > > > > > > of receive. expose synchronization to hot paths it not a 
> > > > > > > > > > > good idea.
> > > > > > > > > >
> > > > > > > > > > I don't think this is a valid objection. I am pretty sure 
> > > > > > > > > > we already
> > > > > > > > > > synchronize in the relevant places in the consumer to solve 
> > > > > > > > > > the exact
> > > > > > > > > > race condition you're concerned about: [0] [1].
> > > > > > > > > >
> > > > > > > > > > My proposed operation is to keep track of the latest 
> > > > > > > > > > message id that
> > > > > > > > > > the application has seen, and then tell the broker that id 
> > > > > > > > > > when
> > > > > > > > > > sending the Subscribe command. We already do similar logic 
> > > > > > > > > > here [2]
> > > > > > > > > > [3], but instead of getting the first message id the 
> > > > > > > > > > consumer hasn't
> > > > > > > > > > seen, we'll get the latest message id seen.
> > > > > > > > > >
> > > > > > > > > > Regarding performance, the PIP doesn't touch on how it will 
> > > > > > > > > > filter out
> > > > > > > > > > messages. What is the planned approach? In my 
> > > > > > > > > > understanding, the
> > > > > > > > > > client will keep track of the latest message id that the 
> > > > > > > > > > application
> > > > > > > > > > has seen and then will need to compare that message id 
> > > > > > > > > > against every
> > > > > > > > > > new mess. As such, it seems like telling the broker where 
> > > > > > > > > > to start
> > > > > > > > > > instead of naively checking a filter on every message would 
> > > > > > > > > > be
> > > > > > > > > > cheaper.
> > > > > > > > > >
> > > > > > > > > > > As described in Compatibility in PIP. Client consumer 
> > > > > > > > > > > doesn't know
> > > > > > > > > > > Pulsar Admin reset cursor.
> > > > > > > > > >
> > > > > > > > > > The problem of "the consumer doesn't know" seems like 
> > > > > > > > > > something that
> > > > > > > > > > is reasonably within the protocol's responsibilities. In 
> > > > > > > > > > this case, an
> > > > > > > > > > event happens on the broker, and the broker can tell the 
> > > > > > > > > > consumer.
> > > > > > > > > >
> > > > > > > > > > > * <p>Consumers should close when the server resets the 
> > > > > > > > > > > cursor,
> > > > > > > > > > > * when the cursor reset success, and then restart. 
> > > > > > > > > > > Otherwise,
> > > > > > > > > > > * the consumer will not receive the history messages.
> > > > > > > > > >
> > > > > > > > > > This is introducing a confusing edge case that requires 
> > > > > > > > > > reading a
> > > > > > > > > > Javadoc in order to understand. That seems risky to me, and 
> > > > > > > > > > I do not
> > > > > > > > > > think we should add such an edge case. A new protocol 
> > > > > > > > > > message would
> > > > > > > > > > easily handle it and make it transparent to the application.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Michael
> > > > > > > > > >
> > > > > > > > > > [0] 
> > > > > > > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > > > > [1] 
> > > > > > > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > > > > [2] 
> > > > > > > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > > > [3] 
> > > > > > > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > > > > >
> > > > > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > > > > <yubiao.f...@streamnative.io.invalid> wrote:
> > > > > > > > > > >
> > > > > > > > > > > +1
> > > > > > > > > > >
> > > > > > > > > > > Hi, Bo :
> > > > > > > > > > >
> > > > > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yubiao Feng
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 
> > > > > > > > > > > <congbobo...@gmail.com> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi, pulsar community:
> > > > > > > > > > > >
> > > > > > > > > > > > I started a PIP about `Client consumer filter received 
> > > > > > > > > > > > messages`.
> > > > > > > > > > > >
> > > > > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Bo
> > > > > > > > > > > >

Reply via email to