This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c4db3abe59 [fix][client] Fix producer could send timeout when enable 
batching (#19191)
0c4db3abe59 is described below

commit 0c4db3abe599722e0a36a92c6730c8ab7ee06011
Author: houxiaoyu <[email protected]>
AuthorDate: Thu Jan 12 20:30:54 2023 +0800

    [fix][client] Fix producer could send timeout when enable batching (#19191)
---
 .../client/api/SimpleProducerConsumerTest.java     | 22 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 17 +++++++++++------
 2 files changed, 33 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 080e8f3852e..2d08b536f4e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4559,4 +4559,26 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             assertEquals(values.get(i), "msg-" + i);
         }
     }
+
+    @Test(timeOut = 30000)
+    public void testSendMsgGreaterThanBatchingMaxBytes() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/testSendMsgGreaterThanBatchingMaxBytes";
+        final int batchingMaxBytes = 1024;
+        final int timeoutSec = 10;
+        final byte[] msg = new byte[batchingMaxBytes * 2];
+        new Random().nextBytes(msg);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+                .batchingMaxBytes(batchingMaxBytes)
+                .batchingMaxMessages(1000)
+                .sendTimeout(timeoutSec, TimeUnit.SECONDS)
+                .create();
+
+        // sendAsync should complete in time
+        assertNotNull(producer.sendAsync(msg).get(timeoutSec, 
TimeUnit.SECONDS));
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index ef3ead790bc..b8f249dbc0e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -665,11 +665,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                         boolean isBatchFull = batchMessageContainer.add(msg, 
callback);
                         lastSendFuture = callback.getFuture();
                         payload.release();
-                        if (isBatchFull) {
-                            batchMessageAndSend(false);
-                        } else {
-                            maybeScheduleBatchFlushTask();
-                        }
+                        triggerSendIfFullOrScheduleFlush(isBatchFull);
                     }
                     isLastSequenceIdPotentialDuplicated = false;
                 }
@@ -873,6 +869,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 && batchMessageContainer.hasSameTxn(msg);
     }
 
+    private void triggerSendIfFullOrScheduleFlush(boolean isBatchFull) {
+        if (isBatchFull) {
+            batchMessageAndSend(false);
+        } else {
+            maybeScheduleBatchFlushTask();
+        }
+    }
+
     private void doBatchSendAndAdd(MessageImpl<?> msg, SendCallback callback, 
ByteBuf payload) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] Closing out batch to accommodate large 
message with size {}", topic, producerName,
@@ -880,7 +884,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
         try {
             batchMessageAndSend(false);
-            batchMessageContainer.add(msg, callback);
+            boolean isBatchFull = batchMessageContainer.add(msg, callback);
+            triggerSendIfFullOrScheduleFlush(isBatchFull);
             lastSendFuture = callback.getFuture();
         } finally {
             payload.release();

Reply via email to