This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new ef68e59 fix the closed ledger did not delete after expired (#9136)
ef68e59 is described below
commit ef68e5904ea913d4a4b198533d75768c7abb8b49
Author: WangJialing <[email protected]>
AuthorDate: Sun Jan 10 15:11:50 2021 +0800
fix the closed ledger did not delete after expired (#9136)
Fixes #9057
When current ledger closed, if there is no incoming traffic, the read
position of the cursor is still point to the last entry of the closed ledger,
that casue the `slowestReaderLedgerId` point to the closed ledger in
`internalTrimConsumedLedgers()` and fail to delete the closed ledger.
When close current ledger, if cursor's mark delete position point to the
last entry of current ledger, move the read position to the new created ledger.
add test case: testDeletionAfterLedgerClosedAndRetention()
(cherry picked from commit 0e5c5362821515995188f0da0c27181aa25e4f6c)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 ++++++
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 36 ++++++++++++++++++----
.../service/CurrentLedgerRolloverIfFullTest.java | 14 +++------
3 files changed, 44 insertions(+), 16 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4fe5318..1e6b898 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1273,6 +1273,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
} else {
log.info("[{}] Created new ledger {}", name, lh.getId());
ledgers.put(lh.getId(),
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build());
+ final long previousEntries = currentLedgerEntries;
+ final long previousLedgerId = currentLedger.getId();
currentLedger = lh;
currentLedgerEntries = 0;
currentLedgerSize = 0;
@@ -1290,6 +1292,14 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() -
lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
}
+ // Move cursor read point to new ledger
+ for (ManagedCursor cursor : cursors) {
+ PositionImpl markDeletedPosition = (PositionImpl)
cursor.getMarkDeletedPosition();
+ if (markDeletedPosition.getLedgerId() ==
previousLedgerId && markDeletedPosition.getEntryId() + 1 >= previousEntries) {
+ // All entries in last ledger are marked delete,
move read point to the new ledger
+ updateCursor((ManagedCursorImpl) cursor,
PositionImpl.get(currentLedger.getId(), -1));
+ }
+ }
}
@Override
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 246e06a..1cc95b4 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.bookkeeper.mledger.impl;
-import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
@@ -34,12 +33,10 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
-
import io.netty.buffer.ByteBufAllocator;
-
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
@@ -64,8 +61,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -108,6 +103,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
@@ -1836,6 +1832,34 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ml.close();
}
+ @Test
+ public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc,
bkc.getZkHandle());
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setRetentionSizeInMB(0);
+ config.setMaxEntriesPerLedger(1);
+ config.setRetentionTime(1, TimeUnit.SECONDS);
+ config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
+
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
factory.open("deletion_after_retention_test_ledger", config);
+ ManagedCursor c1 = ml.openCursor("testCursor1");
+ ManagedCursor c2 = ml.openCursor("testCursor2");
+ ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
+ c1.skipEntries(1, IndividualDeletedEntries.Exclude);
+ c2.skipEntries(1, IndividualDeletedEntries.Exclude);
+ // let current ledger close
+ ml.rollCurrentLedgerIfFull();
+ // let retention expire
+ Thread.sleep(1500);
+ // delete the expired ledger
+
ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
+
+ // the closed and expired ledger should be deleted
+ assertTrue(ml.getLedgersInfoAsList().size() <= 1);
+ assertEquals(ml.getTotalSize(), 0);
+ ml.close();
+ }
+
/**
* Set retention time = 0 and create a empty ledger,
* first position can't higher than last after trim ledgers.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index b70a595..783eac5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.broker.service;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -29,8 +29,6 @@ import org.apache.pulsar.client.api.Producer;
import org.junit.Test;
import org.testng.Assert;
-import java.util.concurrent.TimeUnit;
-
public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
@Override
protected void setup() throws Exception {
@@ -90,14 +88,10 @@ public class CurrentLedgerRolloverIfFullTest extends
BrokerTestBase {
Assert.assertNotEquals(managedLedger.getCurrentLedgerSize(), 0);
// trigger a ledger rollover
- // and now we have two ledgers, one with expired data and one for empty
+ // the last ledger will be closed and removed and we have one ledger
for empty
managedLedger.rollCurrentLedgerIfFull();
Thread.sleep(1000);
- Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);
-
- // trigger a ledger trimming
- // and now we only have the empty ledger
- managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
- Assert.assertEquals(managedLedger.getCurrentLedgerSize(), 0);
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+ Assert.assertEquals(managedLedger.getTotalSize(), 0);
}
}