lordcheng10 commented on a change in pull request #14985:
URL: https://github.com/apache/pulsar/pull/14985#discussion_r841077550
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
##########
@@ -291,6 +291,189 @@ public void acknowledge1() throws Exception {
ledger.close();
}
+ @Test
+ public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
+ final CountDownLatch counter = new CountDownLatch(1);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setCacheEvictionByMarkDeletedPosition(true);
+ factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+ .toNanos(30000));
+ factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
+ @Override
+ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback()
{
+ @Override
+ public void openCursorComplete(ManagedCursor cursor,
Object ctx) {
+ ManagedLedger ledger = (ManagedLedger) ctx;
+ String message1 = "test";
+ ledger.asyncAddEntry(message1.getBytes(Encoding), new
AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
+ @SuppressWarnings("unchecked")
+ Pair<ManagedLedger, ManagedCursor> pair =
(Pair<ManagedLedger, ManagedCursor>) ctx;
+ ManagedLedger ledger = pair.getLeft();
+ ManagedCursor cursor = pair.getRight();
+ assertEquals(((ManagedLedgerImpl)
ledger).getCacheSize(), message1.getBytes(Encoding).length);
+
+ cursor.asyncReadEntries(1, new
ReadEntriesCallback() {
+ @Override
+ public void
readEntriesComplete(List<Entry> entries, Object ctx) {
+ ManagedCursor cursor = (ManagedCursor)
ctx;
+ assertEquals(entries.size(), 1);
+ Entry entry = entries.get(0);
+ final Position position =
entry.getPosition();
+ assertEquals(new
String(entry.getDataAndRelease(), Encoding), message1);
+ assertEquals(((ManagedLedgerImpl)
ledger).getCacheSize(), message1.getBytes(Encoding).length);
+
+ log.debug("Mark-Deleting to position
{}", position);
+ cursor.asyncMarkDelete(position, new
MarkDeleteCallback() {
+ @Override
+ public void
markDeleteComplete(Object ctx) {
+ log.debug("Mark delete
complete");
+ ManagedCursor cursor =
(ManagedCursor) ctx;
+
assertFalse(cursor.hasMoreEntries());
+ // wait eviction finish.
+ try {
Review comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]