BewareMyPower commented on PR #15413:
URL: https://github.com/apache/pulsar/pull/15413#issuecomment-1115325307

   First, both cases should not be valid. User should be responsible to make 
the sequence id unique.
   
   What this PR wants to solve is the **resend** case of an existing message. 
For example, 4 messages might be split into two groups in key based batching:
   
   - A: 0, 3 (i.e. messages whose keys are all "A", and the sequence ids are 0 
and 3)
   - B: 1, 2
   
   If the client didn't receive the ack receipt of batch A, it might resend 
batch A (msg-0 and msg-3). There are two possible cases:
   1. batch A has been persisted -> broker should not accept the resend
   2. batch A has not been persisted -> broker should accept the resend 
   
   Assuming msg-0 and msg-3 are sent **individually (without batch)**.
   
   For the 1st case, we can verify it easily with the unit test.
   
   ```java
       public void test() throws Exception {
           final String topic = "persistent://my-property/my-ns/test";
           admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
   
           final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
                   .topic(topic)
                   .batcherBuilder(BatcherBuilder.KEY_BASED)
                   .create();
           final List<TypedMessageBuilder<String>> messagesToSend = new 
ArrayList<>();
           
messagesToSend.add(producer.newMessage().value("msg-0").key("A").sequenceId(0));
           
messagesToSend.add(producer.newMessage().value("msg-1").key("B").sequenceId(1));
           
messagesToSend.add(producer.newMessage().value("msg-2").key("B").sequenceId(2));
           
messagesToSend.add(producer.newMessage().value("msg-3").key("A").sequenceId(3));
           final List<CompletableFuture<MessageId>> futures = new ArrayList<>();
           messagesToSend.forEach(msg -> futures.add(msg.sendAsync()));
           producer.flush(); // flush the messages in the batch container
           final List<MessageId> ids = 
futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
           // Resend messages in batch A
           ids.add(messagesToSend.get(0).send());
           ids.add(messagesToSend.get(3).send());
           // TODO: print the ids
       }
   ```
   
   |       | Before this patch | After this patch |
   | ----- | ----------------- | ---------------- |
   | msg-0 | rejected          | rejected         |
   | msg-3 | **accepted**      | rejected         |
   
   Before this patch, the resend of `msg-3` is accepted, this behavior must be 
wrong. Because `msg-0` and `msg-3` have already been persisted.
   
   For the 2nd case, we can modify the test to:
   
   ```java
           final List<CompletableFuture<MessageId>> futures = new ArrayList<>();
           // Assume msg-0 and msg-3 are lost
           futures.add(messagesToSend.get(1).sendAsync());
           futures.add(messagesToSend.get(2).sendAsync());
           producer.flush(); // flush the messages in the batch container
   ```
   
   
   |       | Before this patch | After this patch |
   | ----- | ----------------- | ---------------- |
   | msg-0 | rejected          | rejected         |
   | msg-3 | accepted          | accepted         |
   
   The behaviors are the same. Though before this patch, `msg-3` is accepted 
because `3 > 1`, while after this patch, `msg-3` is accepted because `3 > 2`.
   
   **It's a pity to see `msg-0` is rejected as the duplicated message.** It's 
true that this PR cannot solve this problem as well.
   
   However, if we resent `msg-0` and `msg-3` as a batch,
   
   ```java
           // Resend messages in batch A
           futures.add(messagesToSend.get(0).sendAsync());
           futures.add(messagesToSend.get(3).sendAsync());
           final List<MessageId> newIds = 
futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
   ```
   
   |       | Before this patch | After this patch    |
   | ----- | ----------------- | ------------------- |
   | msg-0 | **rejected**      | **Never Completed** |
   | msg-3 | **rejected**      | **Never Completed** |
   
   > After this patch, the futures of `sendAsync` never completed. It might be 
another bug to fix.
   >
   > > 2022-05-03T04:17:20,988+0800 [main] WARN  
org.apache.pulsar.client.impl.ProducerImpl - Message with sequence id 0 is 
definitely a duplicate
   
   ----
   
   In short, this PR solves the problem that duplicated messages might be 
accepted even if the previous messages were persisted successfully.
   
   - With the default batching builder, the highest sequence id is updated in 
broker.
   - (Before this patch) With the key based batching builder, the lowest 
sequence id is updated in broker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to