This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ab1b5c00565 [fix] [client] Fix memory leak when publishing encountered 
a corner case error (#23738)
ab1b5c00565 is described below

commit ab1b5c00565adfe877719130127fc23ac9c5a0c1
Author: fengyubiao <[email protected]>
AuthorDate: Fri Dec 20 10:59:04 2024 +0800

    [fix] [client] Fix memory leak when publishing encountered a corner case 
error (#23738)
    
    Co-authored-by: Yunze Xu <[email protected]>
---
 .../pulsar/client/impl/ProducerMemoryLeakTest.java | 364 +++++++++++++++++++++
 .../client/impl/BatchMessageContainerImpl.java     |   4 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  54 ++-
 .../pulsar/client/impl/ProducerInterceptors.java   |  12 +-
 4 files changed, 417 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
new file mode 100644
index 00000000000..dcdfd136476
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.mockito.MockedStatic;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class ProducerMemoryLeakTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSendQueueIsFull() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        admin.topics().createNonPartitionedTopic(topicName);
+        ProducerImpl<String> producer = (ProducerImpl<String>) 
pulsarClient.newProducer(Schema.STRING)
+                .blockIfQueueFull(false).maxPendingMessages(1)
+                .enableBatching(true).topic(topicName).create();
+        List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new 
ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            msgBuilderList.add(newMessage(producer));
+        }
+
+        CompletableFuture latestSendFuture = null;
+        for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: 
msgBuilderList) {
+            latestSendFuture = msgBuilder.value("msg-1").sendAsync();
+        }
+        try{
+            latestSendFuture.join();
+        } catch (Exception ex) {
+            // Ignore the error PulsarClientException$ProducerQueueIsFullError.
+            assertTrue(FutureUtil.unwrapCompletionException(ex)
+                    instanceof PulsarClientException.ProducerQueueIsFullError);
+        }
+
+        // Verify: ref is expected.
+        producer.close();
+        for (int i = 0; i < msgBuilderList.size(); i++) {
+            MsgPayloadTouchableMessageBuilder<String> msgBuilder = 
msgBuilderList.get(i);
+            assertEquals(msgBuilder.payload.refCnt(), 1);
+            msgBuilder.release();
+            assertEquals(msgBuilder.payload.refCnt(), 0);
+        }
+        admin.topics().delete(topicName);
+    }
+
+    /**
+     * The content size of msg(value is "msg-1") will be "5".
+     * Then provides two param: 1 and 5.
+     *   1: reach the limitation before adding the message metadata.
+     *   2: reach the limitation after adding the message metadata.
+     */
+    @DataProvider(name = "maxMessageSizeAndCompressions")
+    public Object[][] maxMessageSizeAndCompressions(){
+        return new Object[][] {
+                {1, CompressionType.NONE},
+                {5, CompressionType.NONE},
+                {1, CompressionType.LZ4},
+                {6, CompressionType.LZ4}
+        };
+    }
+
+    @Test(dataProvider = "maxMessageSizeAndCompressions")
+    public void testSendMessageSizeExceeded(int maxMessageSize, 
CompressionType compressionType) throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        admin.topics().createNonPartitionedTopic(topicName);
+        ProducerImpl<String> producer = (ProducerImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topicName)
+                .compressionType(compressionType)
+                .enableBatching(false)
+                .create();
+        producer.getConnectionHandler().setMaxMessageSize(maxMessageSize);
+        MsgPayloadTouchableMessageBuilder<String> msgBuilder = 
newMessage(producer);
+        /**
+         * Mock an error: reached max message size, see more details {@link 
#maxMessageSizeAndCompressions()}.
+         */
+        try (MockedStatic<ByteBufPair> theMock = 
mockStatic(ByteBufPair.class)) {
+            List<ByteBufPair> generatedByteBufPairs = 
Collections.synchronizedList(new ArrayList<>());
+            theMock.when(() -> ByteBufPair.get(any(ByteBuf.class), 
any(ByteBuf.class))).then(invocation -> {
+                ByteBufPair byteBufPair = (ByteBufPair) 
invocation.callRealMethod();
+                generatedByteBufPairs.add(byteBufPair);
+                byteBufPair.retain();
+                return byteBufPair;
+            });
+            try {
+                msgBuilder.value("msg-1").send();
+                fail("expected an error that reached the max message size");
+            } catch (Exception ex) {
+                assertTrue(FutureUtil.unwrapCompletionException(ex)
+                        instanceof 
PulsarClientException.InvalidMessageException);
+            }
+
+            // Verify: message payload has been released.
+            // Since "MsgPayloadTouchableMessageBuilder" has called 
"buffer.retain" once, "refCnt()" should be "1".
+            producer.close();
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(producer.getPendingQueueSize(), 0);
+            });
+            // Verify: ByteBufPair generated for Pulsar Command.
+            if (maxMessageSize == 1) {
+                assertEquals(generatedByteBufPairs.size(),0);
+            } else {
+                assertEquals(generatedByteBufPairs.size(),1);
+                if (compressionType == CompressionType.NONE) {
+                    assertEquals(msgBuilder.payload.refCnt(), 2);
+                } else {
+                    assertEquals(msgBuilder.payload.refCnt(), 1);
+                }
+                for (ByteBufPair byteBufPair : generatedByteBufPairs) {
+                    assertEquals(byteBufPair.refCnt(), 1);
+                    byteBufPair.release();
+                    assertEquals(byteBufPair.refCnt(), 0);
+                }
+            }
+            // Verify: message.payload
+            assertEquals(msgBuilder.payload.refCnt(), 1);
+            msgBuilder.release();
+            assertEquals(msgBuilder.payload.refCnt(), 0);
+        }
+
+        // cleanup.
+        assertEquals(msgBuilder.payload.refCnt(), 0);
+        admin.topics().delete(topicName);
+    }
+
+    /**
+     * The content size of msg(value is "msg-1") will be "5".
+     * Then provides two param: 1 and 5.
+     *   1: Less than the limitation when adding the message into the 
batch-container.
+     *   3: Less than the limitation when building batched messages payload.
+     *   2: Equals the limitation when building batched messages payload.
+     */
+    @DataProvider(name = "maxMessageSizes")
+    public Object[][] maxMessageSizes(){
+        return new Object[][] {
+                {1},
+                {3},
+                {26}
+        };
+    }
+
+    @Test(dataProvider = "maxMessageSizes")
+    public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        admin.topics().createNonPartitionedTopic(topicName);
+        ProducerImpl<String> producer = (ProducerImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topicName)
+                .enableBatching(true)
+                .compressionType(CompressionType.NONE)
+                .create();
+        final ClientCnx cnx = producer.getClientCnx();
+        producer.getConnectionHandler().setMaxMessageSize(maxMessageSize);
+        MsgPayloadTouchableMessageBuilder<String> msgBuilder1 = 
newMessage(producer);
+        MsgPayloadTouchableMessageBuilder<String> msgBuilder2 = 
newMessage(producer);
+        /**
+         * Mock an error: reached max message size. see more detail {@link 
#maxMessageSizes()}.
+         */
+        msgBuilder1.value("msg-1").sendAsync();
+        try {
+            msgBuilder2.value("msg-1").send();
+            if (maxMessageSize != 26) {
+                fail("expected an error that reached the max message size");
+            }
+        } catch (Exception ex) {
+            assertTrue(FutureUtil.unwrapCompletionException(ex)
+                    instanceof PulsarClientException.InvalidMessageException);
+        }
+
+        // Verify: message payload has been released.
+        // Since "MsgPayloadTouchableMessageBuilder" has called 
"buffer.retain" once, "refCnt()" should be "1".
+        producer.close();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(producer.getPendingQueueSize(), 0);
+        });
+        assertEquals(msgBuilder1.payload.refCnt(), 1);
+        assertEquals(msgBuilder2.payload.refCnt(), 1);
+
+        // cleanup.
+        cnx.ctx().close();
+        msgBuilder1.release();
+        msgBuilder2.release();
+        assertEquals(msgBuilder1.payload.refCnt(), 0);
+        assertEquals(msgBuilder2.payload.refCnt(), 0);
+        admin.topics().delete(topicName);
+    }
+
+    @Test
+    public void testSendAfterClosedProducer() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        admin.topics().createNonPartitionedTopic(topicName);
+        ProducerImpl<String> producer =
+                (ProducerImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        // Publish after the producer was closed.
+        MsgPayloadTouchableMessageBuilder<String> msgBuilder = 
newMessage(producer);
+        producer.close();
+        try {
+            msgBuilder.value("msg-1").send();
+            fail("expected an error that the producer has closed");
+        } catch (Exception ex) {
+            assertTrue(FutureUtil.unwrapCompletionException(ex)
+                    instanceof PulsarClientException.AlreadyClosedException);
+        }
+
+        // Verify: message payload has been released.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(producer.getPendingQueueSize(), 0);
+        });
+        assertEquals(msgBuilder.payload.refCnt(), 1);
+
+        // cleanup.
+        msgBuilder.release();
+        assertEquals(msgBuilder.payload.refCnt(), 0);
+        admin.topics().delete(topicName);
+    }
+
+    @DataProvider
+    public Object[][] failedInterceptAt() {
+        return new Object[][]{
+            {"close"},
+            {"eligible"},
+            {"beforeSend"},
+            {"onSendAcknowledgement"},
+        };
+    }
+
+    @Test(dataProvider = "failedInterceptAt")
+    public void testInterceptorError(String method) throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        admin.topics().createNonPartitionedTopic(topicName);
+        ProducerImpl<String> producer = (ProducerImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topicName)
+                .intercept(
+
+                new ProducerInterceptor() {
+                    @Override
+                    public void close() {
+                        if (method.equals("close")) {
+                            throw new RuntimeException("Mocked error");
+                        }
+                    }
+
+                    @Override
+                    public boolean eligible(Message message) {
+                        if (method.equals("eligible")) {
+                            throw new RuntimeException("Mocked error");
+                        }
+                        return false;
+                    }
+
+                    @Override
+                    public Message beforeSend(Producer producer, Message 
message) {
+                        if (method.equals("beforeSend")) {
+                            throw new RuntimeException("Mocked error");
+                        }
+                        return message;
+                    }
+
+                    @Override
+                    public void onSendAcknowledgement(Producer producer, 
Message message, MessageId msgId,
+                                                      Throwable exception) {
+                        if (method.equals("onSendAcknowledgement")) {
+                            throw new RuntimeException("Mocked error");
+                        }
+
+                    }
+                }).create();
+
+        MsgPayloadTouchableMessageBuilder<String> msgBuilder = 
newMessage(producer);
+        try {
+            msgBuilder.value("msg-1").sendAsync().get(3, TimeUnit.SECONDS);
+            // It may throw error.
+        } catch (Exception ex) {
+            assertTrue(ex.getMessage().contains("Mocked"));
+        }
+
+        // Verify: message payload has been released.
+        producer.close();
+        assertEquals(msgBuilder.payload.refCnt(), 1);
+
+        // cleanup.
+        msgBuilder.release();
+        assertEquals(msgBuilder.payload.refCnt(), 0);
+        admin.topics().delete(topicName);
+    }
+
+    private <T> MsgPayloadTouchableMessageBuilder<T> 
newMessage(ProducerImpl<T> producer){
+        return new MsgPayloadTouchableMessageBuilder<T>(producer, 
producer.schema);
+    }
+
+    private static class MsgPayloadTouchableMessageBuilder<T> extends 
TypedMessageBuilderImpl {
+
+        public volatile ByteBuf payload;
+
+        public <T> MsgPayloadTouchableMessageBuilder(ProducerBase producer, 
Schema<T> schema) {
+            super(producer, schema);
+        }
+
+        @Override
+        public Message<T> getMessage() {
+            MessageImpl<T> msg = (MessageImpl<T>) super.getMessage();
+            payload = msg.getPayload();
+            // Retain the msg to avoid it be reused by other task.
+            payload.retain();
+            return msg;
+        }
+
+        public void release() {
+            payload.release();
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 44f1fb27465..7262cfd11e0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -290,8 +290,8 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
             messages.forEach(msg -> producer.client.getMemoryLimitController()
                     .releaseMemory(msg.getUncompressedSize()));
             
producer.client.getMemoryLimitController().releaseMemory(batchAllocatedSizeBytes);
-            discard(new PulsarClientException.InvalidMessageException(
-                    "Message size is bigger than " + getMaxMessageSize() + " 
bytes"));
+            discard(new PulsarClientException.InvalidMessageException("Message 
size "
+                    + encryptedPayload.readableBytes() + " is bigger than " + 
getMaxMessageSize() + " bytes"));
             return null;
         }
         messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b686252b58a..10e0ee2ee3d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -32,6 +32,9 @@ import static 
org.apache.pulsar.common.protocol.Commands.readChecksum;
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandler;
+import io.netty.channel.ChannelPromise;
 import io.netty.util.AbstractReferenceCounted;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
@@ -483,19 +486,46 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return compressedPayload;
     }
 
+    /**
+     * Note on ByteBuf Release Behavior.
+     *
+     * <p>If you have a customized callback, please ignore the note below.</p>
+     *
+     * <p>When using the default callback, please confirm that the {@code 
refCnt()} value of the {@code message}
+     * (as returned by {@link MessageImpl#getDataBuffer}) is {@code 2} when 
you call this method. This is because
+     * the {@code ByteBuf} will be released twice under the following 
conditions:</p>
+     *
+     * <ul>
+     *   <li><b>Batch Messaging Enabled:</b>
+     *     <ol>
+     *       <li>Release 1: When the message is pushed into the batched 
message queue (see {@link #doBatchSendAndAdd}).
+     *       </li>
+     *       <li>Release 2: In the method {@link 
SendCallback#sendComplete(Throwable, OpSendMsgStats)}.</li>
+     *     </ol>
+     *   </li>
+     *   <li><b>Single Message (Batch Messaging Disabled):</b>
+     *     <ol>
+     *       <li>Release 1: When the message is written out by
+     *       {@link ChannelOutboundHandler#write(ChannelHandlerContext, 
Object, ChannelPromise)}.</li>
+     *       <li>Release 2: In the method {@link 
SendCallback#sendComplete(Throwable, OpSendMsgStats)}.</li>
+     *     </ol>
+     *   </li>
+     * </ul>
+     */
     public void sendAsync(Message<?> message, SendCallback callback) {
         checkArgument(message instanceof MessageImpl);
-
-        if (!isValidProducerState(callback, message.getSequenceId())) {
-            return;
-        }
-
         MessageImpl<?> msg = (MessageImpl<?>) message;
         MessageMetadata msgMetadata = msg.getMessageBuilder();
         ByteBuf payload = msg.getDataBuffer();
         final int uncompressedSize = payload.readableBytes();
 
+        if (!isValidProducerState(callback, message.getSequenceId())) {
+            payload.release();
+            return;
+        }
+
         if (!canEnqueueRequest(callback, message.getSequenceId(), 
uncompressedSize)) {
+            payload.release();
             return;
         }
 
@@ -573,6 +603,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         for (int i = 0; i < (totalChunks - 1); i++) {
             if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, 
message.getSequenceId(),
                     0 /* The memory was already reserved */)) {
+                compressedPayload.release();
                 
client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
@@ -603,6 +634,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 }
                 if (chunkId > 0 && conf.isBlockIfQueueFull() && 
!canEnqueueRequest(callback,
                         message.getSequenceId(), 0 /* The memory was already 
reserved */)) {
+                    compressedPayload.release();
                     
client.getMemoryLimitController().releaseMemory(uncompressedSize - 
readStartIndex);
                     semaphoreRelease(totalChunks - chunkId);
                     return;
@@ -723,10 +755,13 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                     } else {
                         // handle boundary cases where message being added 
would exceed
                         // batch size and/or max message size
-                        boolean isBatchFull = batchMessageContainer.add(msg, 
callback);
-                        lastSendFuture = callback.getFuture();
-                        payload.release();
-                        triggerSendIfFullOrScheduleFlush(isBatchFull);
+                        try {
+                            boolean isBatchFull = 
batchMessageContainer.add(msg, callback);
+                            lastSendFuture = callback.getFuture();
+                            triggerSendIfFullOrScheduleFlush(isBatchFull);
+                        } finally {
+                            payload.release();
+                        }
                     }
                     isLastSequenceIdPotentialDuplicated = false;
                 }
@@ -2304,6 +2339,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 batchMessageAndSend(false);
             }
             if (isMessageSizeExceeded(op)) {
+                op.cmd.release();
                 return;
             }
             pendingMessages.add(op);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
index 97f16c37b5d..38492ceae84 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
@@ -60,10 +60,10 @@ public class ProducerInterceptors implements Closeable {
     public Message beforeSend(Producer producer, Message message) {
         Message interceptorMessage = message;
         for (ProducerInterceptor interceptor : interceptors) {
-            if (!interceptor.eligible(message)) {
-                continue;
-            }
             try {
+                if (!interceptor.eligible(message)) {
+                    continue;
+                }
                 interceptorMessage = interceptor.beforeSend(producer, 
interceptorMessage);
             } catch (Throwable e) {
                 if (producer != null) {
@@ -93,10 +93,10 @@ public class ProducerInterceptors implements Closeable {
      */
     public void onSendAcknowledgement(Producer producer, Message message, 
MessageId msgId, Throwable exception) {
         for (ProducerInterceptor interceptor : interceptors) {
-            if (!interceptor.eligible(message)) {
-                continue;
-            }
             try {
+                if (!interceptor.eligible(message)) {
+                    continue;
+                }
                 interceptor.onSendAcknowledgement(producer, message, msgId, 
exception);
             } catch (Throwable e) {
                 log.warn("Error executing interceptor onSendAcknowledgement 
callback ", e);

Reply via email to