This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce419cbd2f3 [imp][java client] lastBatchSendNanoTime initialization
(#17058)
ce419cbd2f3 is described below
commit ce419cbd2f3fe0b66ede580e3f4e19693c1f0240
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Aug 11 11:58:37 2022 -0500
[imp][java client] lastBatchSendNanoTime initialization (#17058)
### Motivation
https://github.com/apache/pulsar/pull/14185 had a minor bug in that the
first message sent to the producer would always deliver because
`lastBatchSendNanoTime` was not initialized correctly. In
https://github.com/apache/pulsar/pull/15406, we improved the initialization,
but I am concerned that it didn't completely solve the problem because the
counter is initialized before grabbing the connection, which will often take
longer than the `batchingMaxPublishDelayMicros`, which defaults to [...]
If possible, I hope to get this merged and cherry-picked to branch-2.11 as
soon as possible.
### Modifications
* Update the `batchFlushTask()` method to handle the initialization case
for `lastBatchSendNanoTime`.
* Remove the initialization for `lastBatchSendNanoTime` in the constructor.
### Verifying this change
This test is fairly trivial to verify, but is hard to test for. I manually
verified using debug logs to see that the time is correctly initialized on the
first run and not afterwards.
### Documentation
- [x] `doc-not-needed`
---
.../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 210fc14eb66..64ed61bd995 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -255,7 +255,6 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
this.batchMessageContainer = (BatchMessageContainerBase)
containerBuilder.build();
this.batchMessageContainer.setProducer(this);
- this.lastBatchSendNanoTime = System.nanoTime();
} else {
this.batchMessageContainer = null;
}
@@ -2091,9 +2090,13 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
long microsSinceLastSend =
TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - lastBatchSendNanoTime);
if (microsSinceLastSend < conf.getBatchingMaxPublishDelayMicros()) {
scheduleBatchFlushTask(conf.getBatchingMaxPublishDelayMicros() -
microsSinceLastSend);
- return;
+ } else if (lastBatchSendNanoTime == 0) {
+ // The first time a producer sends a message, the
lastBatchSendNanoTime is 0.
+ lastBatchSendNanoTime = System.nanoTime();
+ scheduleBatchFlushTask(conf.getBatchingMaxPublishDelayMicros());
+ } else {
+ batchMessageAndSend(true);
}
- batchMessageAndSend(true);
}
// must acquire semaphore before enqueuing