This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0c4db3abe59 [fix][client] Fix producer could send timeout when enable
batching (#19191)
0c4db3abe59 is described below
commit 0c4db3abe599722e0a36a92c6730c8ab7ee06011
Author: houxiaoyu <[email protected]>
AuthorDate: Thu Jan 12 20:30:54 2023 +0800
[fix][client] Fix producer could send timeout when enable batching (#19191)
---
.../client/api/SimpleProducerConsumerTest.java | 22 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 17 +++++++++++------
2 files changed, 33 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 080e8f3852e..2d08b536f4e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4559,4 +4559,26 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
assertEquals(values.get(i), "msg-" + i);
}
}
+
+ @Test(timeOut = 30000)
+ public void testSendMsgGreaterThanBatchingMaxBytes() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/testSendMsgGreaterThanBatchingMaxBytes";
+ final int batchingMaxBytes = 1024;
+ final int timeoutSec = 10;
+ final byte[] msg = new byte[batchingMaxBytes * 2];
+ new Random().nextBytes(msg);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+ .batchingMaxBytes(batchingMaxBytes)
+ .batchingMaxMessages(1000)
+ .sendTimeout(timeoutSec, TimeUnit.SECONDS)
+ .create();
+
+ // sendAsync should complete in time
+ assertNotNull(producer.sendAsync(msg).get(timeoutSec,
TimeUnit.SECONDS));
+ }
}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index ef3ead790bc..b8f249dbc0e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -665,11 +665,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
boolean isBatchFull = batchMessageContainer.add(msg,
callback);
lastSendFuture = callback.getFuture();
payload.release();
- if (isBatchFull) {
- batchMessageAndSend(false);
- } else {
- maybeScheduleBatchFlushTask();
- }
+ triggerSendIfFullOrScheduleFlush(isBatchFull);
}
isLastSequenceIdPotentialDuplicated = false;
}
@@ -873,6 +869,14 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
&& batchMessageContainer.hasSameTxn(msg);
}
+ private void triggerSendIfFullOrScheduleFlush(boolean isBatchFull) {
+ if (isBatchFull) {
+ batchMessageAndSend(false);
+ } else {
+ maybeScheduleBatchFlushTask();
+ }
+ }
+
private void doBatchSendAndAdd(MessageImpl<?> msg, SendCallback callback,
ByteBuf payload) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Closing out batch to accommodate large
message with size {}", topic, producerName,
@@ -880,7 +884,8 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
try {
batchMessageAndSend(false);
- batchMessageContainer.add(msg, callback);
+ boolean isBatchFull = batchMessageContainer.add(msg, callback);
+ triggerSendIfFullOrScheduleFlush(isBatchFull);
lastSendFuture = callback.getFuture();
} finally {
payload.release();