This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new de68e2511e0 [improve][broker] Optimize message payload traffic for
ShadowReplicator (#23236)
de68e2511e0 is described below
commit de68e2511e016fa009852625ce21d1bffe47bf21
Author: Zike Yang <[email protected]>
AuthorDate: Thu Sep 5 11:07:40 2024 +0800
[improve][broker] Optimize message payload traffic for ShadowReplicator
(#23236)
---
build/run_unit_group.sh | 2 +-
.../main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 3 ++-
.../apache/pulsar/broker/service/persistent/ShadowReplicator.java | 6 +++---
.../pulsar/broker/service/persistent/ShadowReplicatorTest.java | 4 ++--
.../pulsar/broker/service/persistent/ShadowTopicRealBkTest.java | 3 ++-
.../src/main/java/org/apache/pulsar/client/impl/MessageImpl.java | 7 +++++++
6 files changed, 17 insertions(+), 8 deletions(-)
diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 2694505e0e0..cdaf69e351b 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -80,7 +80,7 @@ function test_group_broker_group_1() {
}
function test_group_broker_group_2() {
- mvn_test -pl pulsar-broker
-Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,websocket,other'
+ mvn_test -pl pulsar-broker
-Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,broker-replication,websocket,other'
}
function test_group_broker_group_3() {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 539b62fe7fe..3f0699657b5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -239,7 +239,8 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
long ledgerId = ledger != null ? ledger.getId() : ((Position)
ctx).getLedgerId();
- if (ml.hasActiveCursors()) {
+ // Don't insert to the entry cache for the ShadowManagedLedger
+ if (!(ml instanceof ShadowManagedLedgerImpl) && ml.hasActiveCursors())
{
// Avoid caching entries if no cursor has been created
EntryImpl entry = EntryImpl.create(ledgerId, entryId, data);
// EntryCache.insert: duplicates entry by allocating new entry and
data. so, recycle entry after calling
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index 25591857aa1..65bcbfd131f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -67,7 +67,7 @@ public class ShadowReplicator extends PersistentReplicator {
ByteBuf headersAndPayload = entry.getDataBuffer();
MessageImpl msg;
try {
- msg =
MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
+ msg =
MessageImpl.deserializeMetadataWithEmptyPayload(headersAndPayload);
} catch (Throwable t) {
log.error("[{}] Failed to deserialize message at {}
(buffer size: {}): {}", replicatorId,
entry.getPosition(), length, t.getMessage(), t);
@@ -91,9 +91,9 @@ public class ShadowReplicator extends PersistentReplicator {
dispatchRateLimiter.ifPresent(rateLimiter ->
rateLimiter.consumeDispatchQuota(1, entry.getLength()));
- msgOut.recordEvent(headersAndPayload.readableBytes());
+ msgOut.recordEvent(msg.getDataBuffer().readableBytes());
stats.incrementMsgOutCounter();
-
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
+
stats.incrementBytesOutCounter(msg.getDataBuffer().readableBytes());
msg.setReplicatedFrom(localCluster);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
index 9f1885e034d..511cf87133a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
@@ -142,8 +142,8 @@ public class ShadowReplicatorTest extends BrokerTestBase {
Assert.assertEquals(shadowMessage.getBrokerPublishTime(),
sourceMessage.getBrokerPublishTime());
Assert.assertEquals(shadowMessage.getIndex(),
sourceMessage.getIndex());
- //`replicatedFrom` is set as localClusterName in shadow topic.
- Assert.assertNotEquals(shadowMessage.getReplicatedFrom(),
sourceMessage.getReplicatedFrom());
+ Assert.assertEquals(replicator.stats.getBytesOutCount(), 0);
+
Assert.assertEquals(shadowMessage.getMessageId(),
sourceMessage.getMessageId());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
index 9d810b06a7c..b0e572a826c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
import com.google.common.collect.Lists;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
@@ -74,7 +75,7 @@ public class ShadowTopicRealBkTest {
@Test
public void testReadFromStorage() throws Exception {
- final var sourceTopic =
TopicName.get("test-read-from-source").toString();
+ final var sourceTopic = TopicName.get("test-read-from-source" +
UUID.randomUUID()).toString();
final var shadowTopic = sourceTopic + "-shadow";
admin.topics().createNonPartitionedTopic(sourceTopic);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index d369d639a73..72a5fd54e85 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -306,6 +306,13 @@ public class MessageImpl<T> implements Message<T> {
return msg;
}
+ public static MessageImpl<byte[]> deserializeMetadataWithEmptyPayload(
+ ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws
IOException {
+ MessageImpl<byte[]> msg =
deserializeSkipBrokerEntryMetaData(headersAndPayloadWithBrokerEntryMetadata);
+ msg.payload = Unpooled.EMPTY_BUFFER;
+ return msg;
+ }
+
public void setReplicatedFrom(String cluster) {
msgMetadata.setReplicatedFrom(cluster);
}