This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new de68e2511e0 [improve][broker] Optimize message payload traffic for 
ShadowReplicator (#23236)
de68e2511e0 is described below

commit de68e2511e016fa009852625ce21d1bffe47bf21
Author: Zike Yang <[email protected]>
AuthorDate: Thu Sep 5 11:07:40 2024 +0800

    [improve][broker] Optimize message payload traffic for ShadowReplicator 
(#23236)
---
 build/run_unit_group.sh                                            | 2 +-
 .../main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java   | 3 ++-
 .../apache/pulsar/broker/service/persistent/ShadowReplicator.java  | 6 +++---
 .../pulsar/broker/service/persistent/ShadowReplicatorTest.java     | 4 ++--
 .../pulsar/broker/service/persistent/ShadowTopicRealBkTest.java    | 3 ++-
 .../src/main/java/org/apache/pulsar/client/impl/MessageImpl.java   | 7 +++++++
 6 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 2694505e0e0..cdaf69e351b 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -80,7 +80,7 @@ function test_group_broker_group_1() {
 }
 
 function test_group_broker_group_2() {
-  mvn_test -pl pulsar-broker 
-Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,websocket,other'
+  mvn_test -pl pulsar-broker 
-Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,broker-replication,websocket,other'
 }
 
 function test_group_broker_group_3() {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 539b62fe7fe..3f0699657b5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -239,7 +239,8 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
         ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
 
         long ledgerId = ledger != null ? ledger.getId() : ((Position) 
ctx).getLedgerId();
-        if (ml.hasActiveCursors()) {
+        // Don't insert to the entry cache for the ShadowManagedLedger
+        if (!(ml instanceof ShadowManagedLedgerImpl) && ml.hasActiveCursors()) 
{
             // Avoid caching entries if no cursor has been created
             EntryImpl entry = EntryImpl.create(ledgerId, entryId, data);
             // EntryCache.insert: duplicates entry by allocating new entry and 
data. so, recycle entry after calling
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index 25591857aa1..65bcbfd131f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -67,7 +67,7 @@ public class ShadowReplicator extends PersistentReplicator {
                 ByteBuf headersAndPayload = entry.getDataBuffer();
                 MessageImpl msg;
                 try {
-                    msg = 
MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
+                    msg = 
MessageImpl.deserializeMetadataWithEmptyPayload(headersAndPayload);
                 } catch (Throwable t) {
                     log.error("[{}] Failed to deserialize message at {} 
(buffer size: {}): {}", replicatorId,
                             entry.getPosition(), length, t.getMessage(), t);
@@ -91,9 +91,9 @@ public class ShadowReplicator extends PersistentReplicator {
 
                 dispatchRateLimiter.ifPresent(rateLimiter -> 
rateLimiter.consumeDispatchQuota(1, entry.getLength()));
 
-                msgOut.recordEvent(headersAndPayload.readableBytes());
+                msgOut.recordEvent(msg.getDataBuffer().readableBytes());
                 stats.incrementMsgOutCounter();
-                
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
+                
stats.incrementBytesOutCounter(msg.getDataBuffer().readableBytes());
 
                 msg.setReplicatedFrom(localCluster);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
index 9f1885e034d..511cf87133a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
@@ -142,8 +142,8 @@ public class ShadowReplicatorTest extends BrokerTestBase {
         Assert.assertEquals(shadowMessage.getBrokerPublishTime(), 
sourceMessage.getBrokerPublishTime());
         Assert.assertEquals(shadowMessage.getIndex(), 
sourceMessage.getIndex());
 
-        //`replicatedFrom` is set as localClusterName in shadow topic.
-        Assert.assertNotEquals(shadowMessage.getReplicatedFrom(), 
sourceMessage.getReplicatedFrom());
+        Assert.assertEquals(replicator.stats.getBytesOutCount(), 0);
+
         Assert.assertEquals(shadowMessage.getMessageId(), 
sourceMessage.getMessageId());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
index 9d810b06a7c..b0e572a826c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import com.google.common.collect.Lists;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
 import org.apache.pulsar.broker.PulsarService;
@@ -74,7 +75,7 @@ public class ShadowTopicRealBkTest {
 
     @Test
     public void testReadFromStorage() throws Exception {
-        final var sourceTopic = 
TopicName.get("test-read-from-source").toString();
+        final var sourceTopic = TopicName.get("test-read-from-source" + 
UUID.randomUUID()).toString();
         final var shadowTopic = sourceTopic + "-shadow";
 
         admin.topics().createNonPartitionedTopic(sourceTopic);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index d369d639a73..72a5fd54e85 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -306,6 +306,13 @@ public class MessageImpl<T> implements Message<T> {
         return msg;
     }
 
+    public static MessageImpl<byte[]> deserializeMetadataWithEmptyPayload(
+            ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws 
IOException {
+        MessageImpl<byte[]> msg = 
deserializeSkipBrokerEntryMetaData(headersAndPayloadWithBrokerEntryMetadata);
+        msg.payload = Unpooled.EMPTY_BUFFER;
+        return msg;
+    }
+
     public void setReplicatedFrom(String cluster) {
         msgMetadata.setReplicatedFrom(cluster);
     }

Reply via email to