BewareMyPower commented on PR #15413:
URL: https://github.com/apache/pulsar/pull/15413#issuecomment-1115325307
First, both cases should not be valid. User should be responsible to make
the sequence id unique.
What this PR wants to solve is the **resend** case of an existing message.
For example, 4 messages might be split into two groups in key based batching:
- A: 0, 3 (i.e. messages whose keys are all "A", and the sequence ids are 0
and 3)
- B: 1, 2
If the client didn't receive the ack receipt of batch A, it might resend
batch A (msg-0 and msg-3). There are two possible cases:
1. batch A has been persisted -> broker should not accept the resend
2. batch A has not been persisted -> broker should accept the resend
Assuming msg-0 and msg-3 are sent **individually (without batch)**.
For the 1st case, we can verify it easily with the unit test.
```java
public void test() throws Exception {
final String topic = "persistent://my-property/my-ns/test";
admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
final List<TypedMessageBuilder<String>> messagesToSend = new
ArrayList<>();
messagesToSend.add(producer.newMessage().value("msg-0").key("A").sequenceId(0));
messagesToSend.add(producer.newMessage().value("msg-1").key("B").sequenceId(1));
messagesToSend.add(producer.newMessage().value("msg-2").key("B").sequenceId(2));
messagesToSend.add(producer.newMessage().value("msg-3").key("A").sequenceId(3));
final List<CompletableFuture<MessageId>> futures = new ArrayList<>();
messagesToSend.forEach(msg -> futures.add(msg.sendAsync()));
producer.flush(); // flush the messages in the batch container
final List<MessageId> ids =
futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
// Resend messages in batch A
ids.add(messagesToSend.get(0).send());
ids.add(messagesToSend.get(3).send());
// TODO: print the ids
}
```
| | Before this patch | After this patch |
| ----- | ----------------- | ---------------- |
| msg-0 | rejected | rejected |
| msg-3 | **accepted** | rejected |
Before this patch, the resend of `msg-3` is accepted, this behavior must be
wrong. Because `msg-0` and `msg-3` have already been persisted.
For the 2nd case, we can modify the test to:
```java
final List<CompletableFuture<MessageId>> futures = new ArrayList<>();
// Assume msg-0 and msg-3 are lost
futures.add(messagesToSend.get(1).sendAsync());
futures.add(messagesToSend.get(2).sendAsync());
producer.flush(); // flush the messages in the batch container
```
| | Before this patch | After this patch |
| ----- | ----------------- | ---------------- |
| msg-0 | rejected | rejected |
| msg-3 | accepted | accepted |
The behaviors are the same. Though before this patch, `msg-3` is accepted
because `3 > 1`, while after this patch, `msg-3` is accepted because `3 > 2`.
**It's a pity to see `msg-0` is rejected as the duplicated message.** It's
true that this PR cannot solve this problem as well.
However, if we resent `msg-0` and `msg-3` as a batch,
```java
// Resend messages in batch A
futures.add(messagesToSend.get(0).sendAsync());
futures.add(messagesToSend.get(3).sendAsync());
final List<MessageId> newIds =
futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
```
| | Before this patch | After this patch |
| ----- | ----------------- | ------------------- |
| msg-0 | **rejected** | **Never Completed** |
| msg-3 | **rejected** | **Never Completed** |
> After this patch, the futures of `sendAsync` never completed. It might be
another bug to fix.
>
> > 2022-05-03T04:17:20,988+0800 [main] WARN
org.apache.pulsar.client.impl.ProducerImpl - Message with sequence id 0 is
definitely a duplicate
----
In short, this PR solves the problem that duplicated messages might be
accepted even if the previous messages were persisted successfully.
- With the default batching builder, the highest sequence id is updated in
broker.
- (Before this patch) With the key based batching builder, the lowest
sequence id is updated in broker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]