This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 24f90e764a7 [fix][client] Fix producer could send timeout when enable
batching (#19191)
24f90e764a7 is described below
commit 24f90e764a7a492d0937e80ae97fddee7a1a5a72
Author: houxiaoyu <[email protected]>
AuthorDate: Thu Jan 12 20:30:54 2023 +0800
[fix][client] Fix producer could send timeout when enable batching (#19191)
---
.../client/api/SimpleProducerConsumerTest.java | 22 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 17 +++++++++++------
2 files changed, 33 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 46513c70844..4e3fbfb40f2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4556,4 +4556,26 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
consumer.close();
admin.topics().delete(topic, false);
}
+
+ @Test(timeOut = 30000)
+ public void testSendMsgGreaterThanBatchingMaxBytes() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/testSendMsgGreaterThanBatchingMaxBytes";
+ final int batchingMaxBytes = 1024;
+ final int timeoutSec = 10;
+ final byte[] msg = new byte[batchingMaxBytes * 2];
+ new Random().nextBytes(msg);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+ .batchingMaxBytes(batchingMaxBytes)
+ .batchingMaxMessages(1000)
+ .sendTimeout(timeoutSec, TimeUnit.SECONDS)
+ .create();
+
+ // sendAsync should complete in time
+ assertNotNull(producer.sendAsync(msg).get(timeoutSec,
TimeUnit.SECONDS));
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 3c481e641c5..e9987a529f6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -655,11 +655,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
boolean isBatchFull = batchMessageContainer.add(msg,
callback);
lastSendFuture = callback.getFuture();
payload.release();
- if (isBatchFull) {
- batchMessageAndSend(false);
- } else {
- maybeScheduleBatchFlushTask();
- }
+ triggerSendIfFullOrScheduleFlush(isBatchFull);
}
isLastSequenceIdPotentialDuplicated = false;
}
@@ -872,6 +868,14 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
&& batchMessageContainer.hasSameTxn(msg);
}
+ private void triggerSendIfFullOrScheduleFlush(boolean isBatchFull) {
+ if (isBatchFull) {
+ batchMessageAndSend(false);
+ } else {
+ maybeScheduleBatchFlushTask();
+ }
+ }
+
private void doBatchSendAndAdd(MessageImpl<?> msg, SendCallback callback,
ByteBuf payload) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Closing out batch to accommodate large
message with size {}", topic, producerName,
@@ -879,7 +883,8 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
try {
batchMessageAndSend(false);
- batchMessageContainer.add(msg, callback);
+ boolean isBatchFull = batchMessageContainer.add(msg, callback);
+ triggerSendIfFullOrScheduleFlush(isBatchFull);
lastSendFuture = callback.getFuture();
} finally {
payload.release();