luky116 opened a new issue, #21886: URL: https://github.com/apache/pulsar/issues/21886
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version The latest master branch code ### Minimal reproduce step This is a question I had after reading the pulsar source code. I’m not sure if it’s a real problem. In function org.apache.pulsar.broker.service.ServerCnx#handleSend#line 1838,There is a logic to verify sequenceId, the code is as follows: ```java // Persist the message if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) { producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } else { producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } ``` But he did not check the following logic (if the following situation occurs, an exception should be returned): ```java send.hasHighestSequenceId() && send.getSequenceId() > send.getHighestSequenceId() ``` ### What did you expect to see? I hope to check the following logic (if the following situation occurs, an exception should be returned): ```java send.hasHighestSequenceId() && send.getSequenceId() > send.getHighestSequenceId() ``` ### What did you see instead? I find `org.apache.pulsar.broker.service.Producer#publishMessage` function makes a judgment of `lowestSequenceId > highestSequenceId`: ```java public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker, Position position) { if (lowestSequenceId > highestSequenceId) { ``` so `org.apache.pulsar.broker.service.ServerCnx#handleSend` function can be changed to: ```java // Persist the message if (send.hasHighestSequenceId()) { producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } else { producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } ``` ### Anything else? no ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
