This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 2db23b8 [Managed Ledger] Fix the incorrect total size when
BrokerEntryMetadata is enabled (#12714)
2db23b8 is described below
commit 2db23b8dd3eb859cc16e30686578be275026c347
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Nov 11 17:46:55 2021 +0800
[Managed Ledger] Fix the incorrect total size when BrokerEntryMetadata is
enabled (#12714)
### Motivation
When the BrokerEntryMetadata is enabled, the total size in
`ManagedLedgerImpl` is inaccurate. Because when the total size is updated in
`OpAddEntry#safeRun`, the `dataLength` is the initial size of `data` when
`OpAddEntry` is constructed, but `data` could be changed via `setData` method.
The inaccurate total size could affect the retention size validation.
Because in `ManagedLedgerImpl#internalTrimLedgers`, the total size reduces by
the size of `LedgerInfo`, which is assigned from the
`LedgerHandle#getLength()`. Therefore, the total size will become 0 or less
before all ledgers are removed.
### Modifications
- Update `dataLength` field in `setData` method.
- Add a `testManagedLedgerTotalSize` test to `BrokerEntryMetadataE2ETest`.
It produces 10 messages and trigger the rollover manually so that the first
`LedgerInfo` of the managed ledger contains the correct total bytes. Then
compare the `totalSize` field with it to verify this fix works.
(cherry picked from commit 5dbb7d25849f3a037aa522b5d0767801aa0a5096)
---
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 1 +
.../broker/service/BrokerEntryMetadataE2ETest.java | 45 ++++++++++++++++++++++
2 files changed, 46 insertions(+)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 4f36d56..0b96b59 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -313,6 +313,7 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
}
public void setData(ByteBuf data) {
+ this.dataLength = data.readableBytes();
this.data = data;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index d2ea2f1..49b4742 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -18,18 +18,26 @@
*/
package org.apache.pulsar.broker.service;
+import static
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+
+import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.assertj.core.util.Sets;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -354,4 +362,41 @@ public class BrokerEntryMetadataE2ETest extends
BrokerTestBase {
producer.close();
consumer.close();
}
+
+ @Test
+ public void testManagedLedgerTotalSize() throws Exception {
+ final String topic = newTopicName();
+ final int messages = 10;
+
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.lookups().lookupTopic(topic);
+ final ManagedLedgerImpl managedLedger =
pulsar.getBrokerService().getTopicIfExists(topic).get()
+ .map(topicObject -> (ManagedLedgerImpl) ((PersistentTopic)
topicObject).getManagedLedger())
+ .orElse(null);
+ Assert.assertNotNull(managedLedger);
+ final ManagedCursor cursor = managedLedger.openCursor("cursor"); //
prevent ledgers being removed
+
+ @Cleanup
+ final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+ for (int i = 0; i < messages; i++) {
+ producer.send("msg-" + i);
+ }
+
+ Assert.assertTrue(managedLedger.getTotalSize() > 0);
+
+ managedLedger.getConfig().setMinimumRolloverTime(0,
TimeUnit.MILLISECONDS);
+ managedLedger.getConfig().setMaxEntriesPerLedger(1);
+ managedLedger.rollCurrentLedgerIfFull();
+
+ Awaitility.await().atMost(Duration.ofSeconds(3))
+ .until(() -> managedLedger.getLedgersInfo().size() > 1);
+
+ final List<LedgerInfo> ledgerInfoList =
managedLedger.getLedgersInfoAsList();
+ Assert.assertEquals(ledgerInfoList.size(), 2);
+ Assert.assertEquals(ledgerInfoList.get(0).getSize(),
managedLedger.getTotalSize());
+
+ cursor.close();
+ }
}