cbornet commented on a change in pull request #6720: Make ServerCnx and
Producer independent of Netty
URL: https://github.com/apache/pulsar/pull/6720#discussion_r407206438
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
##########
@@ -137,42 +137,39 @@ public boolean equals(Object obj) {
}
public void publishMessage(long producerId, long sequenceId, ByteBuf
headersAndPayload, long batchSize) {
- beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
- publishMessageToTopic(headersAndPayload, sequenceId, batchSize);
+ if (checkAndStartPublish(producerId, sequenceId, headersAndPayload,
batchSize)) {
+ publishMessageToTopic(headersAndPayload, sequenceId, batchSize);
+ }
}
public void publishMessage(long producerId, long lowestSequenceId, long
highestSequenceId,
- ByteBuf headersAndPayload, long batchSize) {
+ ByteBuf headersAndPayload, long batchSize) {
if (lowestSequenceId > highestSequenceId) {
- cnx.ctx().channel().eventLoop().execute(() -> {
- cnx.ctx().writeAndFlush(Commands.newSendError(producerId,
highestSequenceId, ServerError.MetadataError,
- "Invalid lowest or highest sequence id"));
+ execute(() -> {
+ sendError(producerId, highestSequenceId,
ServerError.MetadataError, "Invalid lowest or highest sequence id");
cnx.completedSendOperation(isNonPersistentTopic,
headersAndPayload.readableBytes());
});
- return;
}
- beforePublish(producerId, highestSequenceId, headersAndPayload,
batchSize);
Review comment:
beforePublish makes some checks.
publishMessageToTopic shouldn't be called if those checks fail.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services