This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new c4196fba3ae [fix][broker] Record GeoPersistentReplicator.msgOut before
producer#sendAsync (#21673)
c4196fba3ae is described below
commit c4196fba3ae107d74f9421d3f7ed11c0c245f10f
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Dec 8 10:32:58 2023 +0800
[fix][broker] Record GeoPersistentReplicator.msgOut before
producer#sendAsync (#21673)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../pulsar/broker/service/persistent/GeoPersistentReplicator.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index 08882982297..b8287dd2c14 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -149,9 +149,6 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
}
dispatchRateLimiter.ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(1, entry.getLength()));
-
- msgOut.recordEvent(headersAndPayload.readableBytes());
-
msg.setReplicatedFrom(localCluster);
headersAndPayload.retain();
@@ -181,6 +178,7 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
msg.setSchemaInfoForReplicator(schemaFuture.get());
msg.getMessageBuilder().clearTxnidMostBits();
msg.getMessageBuilder().clearTxnidLeastBits();
+ msgOut.recordEvent(headersAndPayload.readableBytes());
// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
producer.sendAsync(msg, ProducerSendCallback.create(this,
entry, msg));