poorbarcode commented on code in PR #22393:
URL: https://github.com/apache/pulsar/pull/22393#discussion_r1548195395
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -378,80 +378,93 @@ CompletableFuture<MessageId> internalSendAsync(Message<?>
message) {
pendingMessagesUpDownCounter.increment();
pendingBytesUpDownCounter.add(msgSize);
- sendAsync(interceptorMessage, new SendCallback() {
- SendCallback nextCallback = null;
- MessageImpl<?> nextMsg = null;
- long createdAt = System.nanoTime();
+ sendAsync(interceptorMessage, new DefaultSendMessageCallback(future,
interceptorMessage, msgSize));
+ return future;
+ }
- @Override
- public CompletableFuture<MessageId> getFuture() {
- return future;
- }
+ private class DefaultSendMessageCallback implements SendCallback {
- @Override
- public SendCallback getNextSendCallback() {
- return nextCallback;
- }
+ CompletableFuture<MessageId> sendFuture;
+ MessageImpl<?> currentMsg;
+ int msgSize;
+ long createdAt = System.nanoTime();
+ SendCallback nextCallback = null;
+ MessageImpl<?> nextMsg = null;
- @Override
- public MessageImpl<?> getNextMessage() {
- return nextMsg;
- }
+ DefaultSendMessageCallback(CompletableFuture<MessageId> sendFuture,
MessageImpl<?> currentMsg, int msgSize) {
+ this.sendFuture = sendFuture;
+ this.currentMsg = currentMsg;
+ this.msgSize = msgSize;
+ }
- @Override
- public void sendComplete(Exception e) {
- long latencyNanos = System.nanoTime() - createdAt;
- pendingMessagesUpDownCounter.decrement();
- pendingBytesUpDownCounter.subtract(msgSize);
+ @Override
+ public CompletableFuture<MessageId> getFuture() {
+ return sendFuture;
+ }
+
+ @Override
+ public SendCallback getNextSendCallback() {
+ return nextCallback;
+ }
+
+ @Override
+ public MessageImpl<?> getNextMessage() {
+ return nextMsg;
+ }
+ @Override
+ public void sendComplete(Exception e) {
+ SendCallback loopingCallback = this;
+ MessageImpl<?> loopingMsg = currentMsg;
+ while (loopingCallback != null) {
+ onSendComplete(e, loopingCallback, loopingMsg);
+ loopingMsg = loopingCallback.getNextMessage();
+ loopingCallback = loopingCallback.getNextSendCallback();
+ }
+ }
+
+ private void onSendComplete(Exception e, SendCallback sendCallback,
MessageImpl<?> msg) {
+ long createdAt = (sendCallback instanceof
ProducerImpl.DefaultSendMessageCallback)
+ ? ((DefaultSendMessageCallback) sendCallback).createdAt :
this.createdAt;
+ long latencyNanos = System.nanoTime() - createdAt;
+ pendingMessagesUpDownCounter.decrement();
+ pendingBytesUpDownCounter.subtract(msgSize);
+ ByteBuf payload = msg.getDataBuffer();
+ if (e != null) {
+ latencyHistogram.recordFailure(latencyNanos);
+ stats.incrementSendFailed();
try {
- if (e != null) {
- latencyHistogram.recordFailure(latencyNanos);
- stats.incrementSendFailed();
- onSendAcknowledgement(interceptorMessage, null, e);
- future.completeExceptionally(e);
- } else {
- latencyHistogram.recordSuccess(latencyNanos);
- publishedBytesCounter.add(msgSize);
- onSendAcknowledgement(interceptorMessage,
interceptorMessage.getMessageId(), null);
- future.complete(interceptorMessage.getMessageId());
- stats.incrementNumAcksReceived(latencyNanos);
- }
+ onSendAcknowledgement(msg, null, e);
+ sendCallback.getFuture().completeExceptionally(e);
} finally {
- interceptorMessage.getDataBuffer().release();
+ if (payload == null) {
+ log.error("[{}] [{}] Payload is null when calling a
failed onSendComplete, which is not"
+ + " expected.", topic, producerName);
Review Comment:
Added a `return`. `ReferenceCountUtil.safeRelease(payload)` will do nothing
if the `payload` is null
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]