BewareMyPower commented on code in PR #23236:
URL: https://github.com/apache/pulsar/pull/23236#discussion_r1740383397
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java:
##########
@@ -131,7 +131,7 @@ public void testShadowReplication() throws Exception {
Message<byte[]> shadowMessage = shadowConsumer.receive(5,
TimeUnit.SECONDS);
- Assert.assertEquals(shadowMessage.getData(), sourceMessage.getData());
+ Assert.assertEquals(shadowMessage.getData().length, 0);
Review Comment:
This is a breaking change. The shadow consumer should still receive the
message
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java:
##########
@@ -239,7 +239,8 @@ public void run() {
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
long ledgerId = ledger != null ? ledger.getId() : ((Position)
ctx).getLedgerId();
- if (ml.hasActiveCursors()) {
+ // Don't insert to the entry cache for the ShadowManagedLedger
+ if (!(ml instanceof ShadowManagedLedgerImpl) && ml.hasActiveCursors())
{
Review Comment:
The `instanceof` might have some issues for customized managed ledger
implementations. See similar issue in
https://github.com/apache/pulsar/pull/23179. If we're going to support
replicating payloads for some specific topics in future, we need to modify this
type check.
```java
// Don't insert to the entry cache for empty payload
final var duplicatedData = data.duplicate();
Commands.skipBrokerEntryMetadataIfExist(duplicatedData);
Commands.skipChecksumIfPresent(duplicatedData);
long metadataSize = duplicatedData.readUnsignedInt();
if (duplicatedData.readableBytes() > metadataSize &&
ml.hasActiveCursors()) {
```
We can just check if the payload is empty. The code above simulates
`Commands.parseMessageMetadata` but it just reads a few bytes and does not call
`parseFrom`.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java:
##########
@@ -74,7 +75,7 @@ public void cleanup() throws Exception {
@Test
public void testReadFromStorage() throws Exception {
- final var sourceTopic =
TopicName.get("test-read-from-source").toString();
+ final var sourceTopic = TopicName.get("test-read-from-source" +
UUID.randomUUID()).toString();
Review Comment:
This change seems not necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]