luky116 opened a new issue, #21886:
URL: https://github.com/apache/pulsar/issues/21886

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   The latest master branch code
   
   ### Minimal reproduce step
   
   This is a question I had after reading the pulsar source code. I’m not sure 
if it’s a real problem.
   
   In function org.apache.pulsar.broker.service.ServerCnx#handleSend#line 
1838,There is a logic to verify sequenceId, the code is as follows:
   
   ```java
   // Persist the message
   if (send.hasHighestSequenceId() && send.getSequenceId() <= 
send.getHighestSequenceId()) {
       producer.publishMessage(send.getProducerId(), send.getSequenceId(), 
send.getHighestSequenceId(),
               headersAndPayload, send.getNumMessages(), send.isIsChunk(), 
send.isMarker(), position);
   } else {
       producer.publishMessage(send.getProducerId(), send.getSequenceId(), 
headersAndPayload,
               send.getNumMessages(), send.isIsChunk(), send.isMarker(), 
position);
   }
   ```
   
   But he did not check the following logic (if the following situation occurs, 
an exception should be returned):
   ```java
   send.hasHighestSequenceId() && send.getSequenceId() > 
send.getHighestSequenceId()
   ```
   
   ### What did you expect to see?
   
   I hope to check the following logic (if the following situation occurs, an 
exception should be returned):
   ```java
   send.hasHighestSequenceId() && send.getSequenceId() > 
send.getHighestSequenceId()
   ```
   
   ### What did you see instead?
   
   I find `org.apache.pulsar.broker.service.Producer#publishMessage` function 
makes a judgment of `lowestSequenceId > highestSequenceId`:
   ```java
   public void publishMessage(long producerId, long lowestSequenceId, long 
highestSequenceId,
               ByteBuf headersAndPayload, long batchSize, boolean isChunked, 
boolean isMarker, Position position) {
           if (lowestSequenceId > highestSequenceId) {
   ```
   so  `org.apache.pulsar.broker.service.ServerCnx#handleSend` function can be 
changed to:
   ```java
   // Persist the message
   if (send.hasHighestSequenceId()) {
       producer.publishMessage(send.getProducerId(), send.getSequenceId(), 
send.getHighestSequenceId(),
               headersAndPayload, send.getNumMessages(), send.isIsChunk(), 
send.isMarker(), position);
   } else {
       producer.publishMessage(send.getProducerId(), send.getSequenceId(), 
headersAndPayload,
               send.getNumMessages(), send.isIsChunk(), send.isMarker(), 
position);
   }
   ```
   
   ### Anything else?
   
   no
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to