cbornet commented on a change in pull request #6720: Make ServerCnx and 
Producer independent of Netty
URL: https://github.com/apache/pulsar/pull/6720#discussion_r407206438
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
 ##########
 @@ -137,42 +137,39 @@ public boolean equals(Object obj) {
     }
 
     public void publishMessage(long producerId, long sequenceId, ByteBuf 
headersAndPayload, long batchSize) {
-        beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
-        publishMessageToTopic(headersAndPayload, sequenceId, batchSize);
+        if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, 
batchSize)) {
+            publishMessageToTopic(headersAndPayload, sequenceId, batchSize);
+        }
     }
 
     public void publishMessage(long producerId, long lowestSequenceId, long 
highestSequenceId,
-           ByteBuf headersAndPayload, long batchSize) {
+            ByteBuf headersAndPayload, long batchSize) {
         if (lowestSequenceId > highestSequenceId) {
-            cnx.ctx().channel().eventLoop().execute(() -> {
-                cnx.ctx().writeAndFlush(Commands.newSendError(producerId, 
highestSequenceId, ServerError.MetadataError,
-                        "Invalid lowest or highest sequence id"));
+            execute(() -> {
+                sendError(producerId, highestSequenceId, 
ServerError.MetadataError, "Invalid lowest or highest sequence id");
                 cnx.completedSendOperation(isNonPersistentTopic, 
headersAndPayload.readableBytes());
             });
-            return;
         }
-        beforePublish(producerId, highestSequenceId, headersAndPayload, 
batchSize);
 
 Review comment:
   beforePublish makes some checks.
   publishMessageToTopic shouldn't be called if those checks fail.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to