This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 18cb458f73b [improve][broker] Add callback parameters to the 
SendCallback.sendComplete (#23196)
18cb458f73b is described below

commit 18cb458f73b8967eb7c1f1f133e0b7f9f1a60e93
Author: crossoverJie <[email protected]>
AuthorDate: Wed Aug 21 09:41:53 2024 +0800

    [improve][broker] Add callback parameters to the SendCallback.sendComplete 
(#23196)
---
 .../nonpersistent/NonPersistentReplicator.java     |  3 +-
 .../service/persistent/PersistentReplicator.java   |  3 +-
 .../client/impl/ProduceWithMessageIdTest.java      | 85 +++++++++++++++++++++-
 .../client/impl/BatchMessageContainerImpl.java     |  2 +-
 .../apache/pulsar/client/impl/OpSendMsgStats.java  | 38 ++++++++++
 .../pulsar/client/impl/OpSendMsgStatsImpl.java     | 73 +++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 34 ++++++---
 .../apache/pulsar/client/impl/SendCallback.java    |  5 +-
 8 files changed, 224 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 6441230fad8..45b4ebf6e17 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -33,6 +33,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.OpSendMsgStats;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.SendCallback;
@@ -173,7 +174,7 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
         private MessageImpl msg;
 
         @Override
-        public void sendComplete(Exception exception) {
+        public void sendComplete(Throwable exception, OpSendMsgStats 
opSendMsgStats) {
             if (exception != null) {
                 log.error("[{}] Error producing on remote broker", 
replicator.replicatorId, exception);
             } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 33e883ab940..b3d7546beed 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -60,6 +60,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.OpSendMsgStats;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.SendCallback;
@@ -377,7 +378,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         private MessageImpl msg;
 
         @Override
-        public void sendComplete(Exception exception) {
+        public void sendComplete(Throwable exception, OpSendMsgStats 
opSendMsgStats) {
             if (exception != null && !(exception instanceof 
PulsarClientException.InvalidMessageException)) {
                 log.error("[{}] Error producing on remote broker", 
replicator.replicatorId, exception);
                 // cursor should be rewinded since it was incremented when 
readMoreEntries
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
index b8efdeb9969..45f9a9c52e8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
@@ -18,14 +18,18 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static 
org.apache.pulsar.client.impl.AbstractBatchMessageContainer.INITIAL_BATCH_BUFFER_SIZE;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MockBrokerService;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -38,13 +42,20 @@ import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
 @Slf4j
-public class ProduceWithMessageIdTest {
+public class ProduceWithMessageIdTest extends ProducerConsumerBase {
     MockBrokerService mockBrokerService;
 
     @BeforeClass(alwaysRun = true)
-    public void setup() {
+    public void setup() throws Exception {
         mockBrokerService = new MockBrokerService();
         mockBrokerService.start();
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
     }
 
     @AfterClass(alwaysRun = true)
@@ -86,7 +97,7 @@ public class ProduceWithMessageIdTest {
         AtomicBoolean result = new AtomicBoolean(false);
         producer.sendAsync(msg, new SendCallback() {
             @Override
-            public void sendComplete(Exception e) {
+            public void sendComplete(Throwable e, OpSendMsgStats 
opSendMsgStats) {
                 log.info("sendComplete", e);
                 result.set(e == null);
             }
@@ -115,4 +126,72 @@ public class ProduceWithMessageIdTest {
         // the result is true only if broker received right message id.
         Awaitility.await().untilTrue(result);
     }
+
+    @Test
+    public void sendWithCallBack() throws Exception {
+
+        int batchSize = 10;
+
+        String topic = "persistent://public/default/testSendWithCallBack";
+        ProducerImpl<byte[]> producer =
+                (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic)
+                        .enableBatching(true)
+                        .batchingMaxMessages(batchSize)
+                        .create();
+
+        CountDownLatch cdl = new CountDownLatch(1);
+        AtomicReference<OpSendMsgStats> sendMsgStats = new AtomicReference<>();
+        SendCallback sendComplete = new SendCallback() {
+            @Override
+            public void sendComplete(Throwable e, OpSendMsgStats 
opSendMsgStats) {
+                log.info("sendComplete", e);
+                if (e == null){
+                    cdl.countDown();
+                    sendMsgStats.set(opSendMsgStats);
+                }
+            }
+
+            @Override
+            public void addCallback(MessageImpl<?> msg, SendCallback scb) {
+
+            }
+
+            @Override
+            public SendCallback getNextSendCallback() {
+                return null;
+            }
+
+            @Override
+            public MessageImpl<?> getNextMessage() {
+                return null;
+            }
+
+            @Override
+            public CompletableFuture<MessageId> getFuture() {
+                return null;
+            }
+        };
+        int totalReadabled = 0;
+        int totalUncompressedSize = 0;
+        for (int i = 0; i < batchSize; i++) {
+            MessageMetadata metadata = new MessageMetadata();
+            ByteBuffer buffer = 
ByteBuffer.wrap("data".getBytes(StandardCharsets.UTF_8));
+            MessageImpl<byte[]> msg = MessageImpl.create(metadata, buffer, 
Schema.BYTES, topic);
+            msg.getDataBuffer().retain();
+            totalReadabled += msg.getDataBuffer().readableBytes();
+            totalUncompressedSize += msg.getUncompressedSize();
+            producer.sendAsync(msg, sendComplete);
+        }
+
+        cdl.await();
+        OpSendMsgStats opSendMsgStats = sendMsgStats.get();
+        Assert.assertEquals(opSendMsgStats.getUncompressedSize(), 
totalUncompressedSize + INITIAL_BATCH_BUFFER_SIZE);
+        Assert.assertEquals(opSendMsgStats.getSequenceId(), 0);
+        Assert.assertEquals(opSendMsgStats.getRetryCount(), 1);
+        Assert.assertEquals(opSendMsgStats.getBatchSizeByte(), totalReadabled);
+        Assert.assertEquals(opSendMsgStats.getNumMessagesInBatch(), batchSize);
+        Assert.assertEquals(opSendMsgStats.getHighestSequenceId(), 
batchSize-1);
+        Assert.assertEquals(opSendMsgStats.getTotalChunks(), 0);
+        Assert.assertEquals(opSendMsgStats.getChunkId(), -1);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index a3c9d1bc9ab..44f1fb27465 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -229,7 +229,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         try {
             // Need to protect ourselves from any exception being thrown in 
the future handler from the application
             if (firstCallback != null) {
-                firstCallback.sendComplete(ex);
+                firstCallback.sendComplete(ex, null);
             }
             if (batchedMessageMetadataAndPayload != null) {
                 
ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStats.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStats.java
new file mode 100644
index 00000000000..dc28df50f28
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStats.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+
+public interface OpSendMsgStats {
+    long getUncompressedSize();
+
+    long getSequenceId();
+
+    int getRetryCount();
+
+    long getBatchSizeByte();
+
+    int getNumMessagesInBatch();
+
+    long getHighestSequenceId();
+
+    int getTotalChunks();
+
+    int getChunkId();
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStatsImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStatsImpl.java
new file mode 100644
index 00000000000..41bb742776c
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStatsImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Builder;
+
+@Builder
+public class OpSendMsgStatsImpl implements OpSendMsgStats {
+    private long uncompressedSize;
+    private long sequenceId;
+    private int retryCount;
+    private long batchSizeByte;
+    private int numMessagesInBatch;
+    private long highestSequenceId;
+    private int totalChunks;
+    private int chunkId;
+
+    @Override
+    public long getUncompressedSize() {
+        return uncompressedSize;
+    }
+
+    @Override
+    public long getSequenceId() {
+        return sequenceId;
+    }
+
+    @Override
+    public int getRetryCount() {
+        return retryCount;
+    }
+
+    @Override
+    public long getBatchSizeByte() {
+        return batchSizeByte;
+    }
+
+    @Override
+    public int getNumMessagesInBatch() {
+        return numMessagesInBatch;
+    }
+
+    @Override
+    public long getHighestSequenceId() {
+        return highestSequenceId;
+    }
+
+    @Override
+    public int getTotalChunks() {
+        return totalChunks;
+    }
+
+    @Override
+    public int getChunkId() {
+        return chunkId;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 6d5a8145463..5c46057ae30 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -414,7 +414,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
 
         @Override
-        public void sendComplete(Exception e) {
+        public void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats) {
             SendCallback loopingCallback = this;
             MessageImpl<?> loopingMsg = currentMsg;
             while (loopingCallback != null) {
@@ -424,7 +424,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             }
         }
 
-        private void onSendComplete(Exception e, SendCallback sendCallback, 
MessageImpl<?> msg) {
+        private void onSendComplete(Throwable e, SendCallback sendCallback, 
MessageImpl<?> msg) {
             long createdAt = (sendCallback instanceof 
ProducerImpl.DefaultSendMessageCallback)
                     ? ((DefaultSendMessageCallback) sendCallback).createdAt : 
this.createdAt;
             long latencyNanos = System.nanoTime() - createdAt;
@@ -842,7 +842,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 log.warn("[{}] [{}] GetOrCreateSchema error", topic, 
producerName, t);
                 if (t instanceof 
PulsarClientException.IncompatibleSchemaException) {
                     msg.setSchemaState(MessageImpl.SchemaState.Broken);
-                    
callback.sendComplete((PulsarClientException.IncompatibleSchemaException) t);
+                    callback.sendComplete(t, null);
                 }
             } else {
                 log.info("[{}] [{}] GetOrCreateSchema succeed", topic, 
producerName);
@@ -985,19 +985,19 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             case Closing:
             case Closed:
                 callback.sendComplete(
-                        new 
PulsarClientException.AlreadyClosedException("Producer already closed", 
sequenceId));
+                        new 
PulsarClientException.AlreadyClosedException("Producer already closed", 
sequenceId), null);
                 return false;
             case ProducerFenced:
-                callback.sendComplete(new 
PulsarClientException.ProducerFencedException("Producer was fenced"));
+                callback.sendComplete(new 
PulsarClientException.ProducerFencedException("Producer was fenced"), null);
                 return false;
             case Terminated:
                 callback.sendComplete(
-                        new 
PulsarClientException.TopicTerminatedException("Topic was terminated", 
sequenceId));
+                        new 
PulsarClientException.TopicTerminatedException("Topic was terminated", 
sequenceId), null);
                 return false;
             case Failed:
             case Uninitialized:
             default:
-                callback.sendComplete(new 
PulsarClientException.NotConnectedException(sequenceId));
+                callback.sendComplete(new 
PulsarClientException.NotConnectedException(sequenceId), null);
                 return false;
         }
     }
@@ -1012,20 +1012,20 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             } else {
                 if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
                     callback.sendComplete(new 
PulsarClientException.ProducerQueueIsFullError(
-                            "Producer send queue is full", sequenceId));
+                            "Producer send queue is full", sequenceId), null);
                     return false;
                 }
 
                 if 
(!client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
                     semaphore.ifPresent(Semaphore::release);
                     callback.sendComplete(new 
PulsarClientException.MemoryBufferIsFullError(
-                            "Client memory buffer is full", sequenceId));
+                            "Client memory buffer is full", sequenceId), null);
                     return false;
                 }
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            callback.sendComplete(new PulsarClientException(e, sequenceId));
+            callback.sendComplete(new PulsarClientException(e, sequenceId), 
null);
             return false;
         }
 
@@ -1302,7 +1302,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     private void completeCallbackAndReleaseSemaphore(long payloadSize, 
SendCallback callback, Exception exception) {
         semaphore.ifPresent(Semaphore::release);
         client.getMemoryLimitController().releaseMemory(payloadSize);
-        callback.sendComplete(exception);
+        callback.sendComplete(exception, null);
     }
 
     /**
@@ -1595,7 +1595,17 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                     rpcLatencyHistogram.recordFailure(now - this.lastSentAt);
                 }
 
-                callback.sendComplete(finalEx);
+                OpSendMsgStats opSendMsgStats = OpSendMsgStatsImpl.builder()
+                        .uncompressedSize(uncompressedSize)
+                        .sequenceId(sequenceId)
+                        .retryCount(retryCount)
+                        .batchSizeByte(batchSizeByte)
+                        .numMessagesInBatch(numMessagesInBatch)
+                        .highestSequenceId(highestSequenceId)
+                        .totalChunks(totalChunks)
+                        .chunkId(chunkId)
+                        .build();
+                callback.sendComplete(finalEx, opSendMsgStats);
             }
         }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java
index 369bb34a29a..f55d7ae7912 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java
@@ -20,18 +20,21 @@ package org.apache.pulsar.client.impl;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.classification.InterfaceStability;
 
 /**
  *
  */
[email protected]
 public interface SendCallback {
 
     /**
      * invoked when send operation completes.
      *
      * @param e
+     * @param opSendMsgStats stats associated with the send operation
      */
-    void sendComplete(Exception e);
+    void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats);
 
     /**
      * used to specify a callback to be invoked on completion of a send 
operation for individual messages sent in a

Reply via email to