This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3314d70231c [fix] [ml] topic load fail by ledger lost (#19444)
3314d70231c is described below
commit 3314d70231cbed89b6eefa2073ef8c048d84ec16
Author: fengyubiao <[email protected]>
AuthorDate: Tue Feb 21 21:47:28 2023 +0800
[fix] [ml] topic load fail by ledger lost (#19444)
Makes only ledgers removed from the meta of ledger info can be deleted from
the BK.
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 27 +++--
.../mledger/impl/ShadowManagedLedgerImpl.java | 5 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 114 +++++++++++++++++++++
3 files changed, 135 insertions(+), 11 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e334adf078a..9c05fb7c104 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -68,6 +68,7 @@ import
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
@@ -471,6 +472,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
// Calculate total entries and size
+ final List<Long> emptyLedgersToBeDeleted =
Collections.synchronizedList(new ArrayList<>());
Iterator<LedgerInfo> iterator = ledgers.values().iterator();
while (iterator.hasNext()) {
LedgerInfo li = iterator.next();
@@ -479,9 +481,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
} else {
iterator.remove();
- bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> {
- log.info("[{}] Deleted empty ledger ledgerId={} rc={}",
name, li.getLedgerId(), rc);
- }, null);
+ emptyLedgersToBeDeleted.add(li.getLedgerId());
}
}
@@ -497,6 +497,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public void operationComplete(Void v, Stat stat) {
ledgersStat = stat;
+ emptyLedgersToBeDeleted.forEach(ledgerId -> {
+ bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+ log.info("[{}] Deleted empty ledger ledgerId={}
rc={}", name, ledgerId, rc);
+ }, null);
+ });
initializeCursors(callback);
}
@@ -1551,11 +1556,12 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
ledgersStat = stat;
synchronized (ManagedLedgerImpl.this) {
+ LedgerHandle originalCurrentLedger = currentLedger;
ledgers.put(lh.getId(), newLedger);
currentLedger = lh;
currentLedgerEntries = 0;
currentLedgerSize = 0;
- updateLedgersIdsComplete();
+ updateLedgersIdsComplete(originalCurrentLedger);
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
}
@@ -1646,8 +1652,15 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
} while (existsOp != null && --pendingSize > 0);
}
- protected synchronized void updateLedgersIdsComplete() {
+ protected synchronized void updateLedgersIdsComplete(@Nullable
LedgerHandle originalCurrentLedger) {
STATE_UPDATER.set(this, State.LedgerOpened);
+ // Delete original "currentLedger" if it has been removed from
"ledgers".
+ if (originalCurrentLedger != null &&
!ledgers.containsKey(originalCurrentLedger.getId())){
+ bookKeeper.asyncDeleteLedger(originalCurrentLedger.getId(), (rc,
ctx) -> {
+ mbean.endDataLedgerDeleteOp();
+ log.info("[{}] Delete complete for empty ledger {}. rc={}",
name, originalCurrentLedger.getId(), rc);
+ }, null);
+ }
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
if (log.isDebugEnabled()) {
@@ -1710,10 +1723,6 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// The last ledger was empty, so we can discard it
ledgers.remove(lh.getId());
mbean.startDataLedgerDeleteOp();
- bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
- mbean.endDataLedgerDeleteOp();
- log.info("[{}] Delete complete for empty ledger {}. rc={}",
name, lh.getId(), rc);
- }, null);
}
trimConsumedLedgersInBackground();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index 9a029778fe0..b1f23941347 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -332,7 +333,7 @@ public class ShadowManagedLedgerImpl extends
ManagedLedgerImpl {
currentLedgerEntries = 0;
currentLedgerSize = 0;
initLastConfirmedEntry();
- updateLedgersIdsComplete();
+ updateLedgersIdsComplete(null);
maybeUpdateCursorBeforeTrimmingConsumedLedger();
} else if (isNoSuchLedgerExistsException(rc)) {
log.warn("[{}] Source ledger not found: {}", name,
lastLedgerId);
@@ -365,7 +366,7 @@ public class ShadowManagedLedgerImpl extends
ManagedLedgerImpl {
}
@Override
- protected synchronized void updateLedgersIdsComplete() {
+ protected synchronized void updateLedgersIdsComplete(LedgerHandle
originalCurrentLedger) {
STATE_UPDATER.set(this, State.LedgerOpened);
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 3b011fe8d56..a4d8b75d00c 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -48,6 +48,7 @@ import java.nio.ReadOnlyBufferException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -75,7 +76,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -144,6 +147,117 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
+ private void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean
addEntryFinished) throws Exception {
+ LedgerHandle currentLedger = ml.currentLedger;
+ final LedgerHandle spyLedgerHandle = spy(currentLedger);
+ doAnswer(invocation -> {
+ ByteBuf bs = (ByteBuf) invocation.getArguments()[0];
+ AddCallback addCallback = (AddCallback)
invocation.getArguments()[1];
+ Object originalContext = invocation.getArguments()[2];
+ currentLedger.asyncAddEntry(bs, (rc, lh, entryId, ctx) -> {
+ addEntryFinished.set(true);
+ addCallback.addComplete(BKException.Code.TimeoutException,
spyLedgerHandle, -1, ctx);
+ }, originalContext);
+ return null;
+ }).when(spyLedgerHandle).asyncAddEntry(any(ByteBuf.class),
any(AddCallback.class), any());
+ ml.currentLedger = spyLedgerHandle;
+ }
+
+ @Data
+ private static class DeleteLedgerInfo{
+ volatile boolean hasCalled;
+ volatile CompletableFuture<Void> future = new CompletableFuture<>();
+ }
+
+ private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger,
final AtomicBoolean signal,
+ BookKeeper
spyBookKeeper) {
+ DeleteLedgerInfo deleteLedgerInfo = new DeleteLedgerInfo();
+ doAnswer(invocation -> {
+ long ledgerId = (long) invocation.getArguments()[0];
+ AsyncCallback.DeleteCallback originalCb =
(AsyncCallback.DeleteCallback) invocation.getArguments()[1];
+ AsyncCallback.DeleteCallback cb = (rc, ctx) -> {
+ if (deleteLedgerInfo.hasCalled) {
+ deleteLedgerInfo.future.complete(null);
+ }
+ originalCb.deleteComplete(rc, ctx);
+ };
+ Object ctx = invocation.getArguments()[2];
+ if (ledgerId != ledger.getId()){
+ bkc.asyncDeleteLedger(ledgerId, originalCb, ctx);
+ } else {
+ deleteLedgerInfo.hasCalled = true;
+ new Thread(() -> {
+
Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get);
+ bkc.asyncDeleteLedger(ledgerId, cb, ctx);
+ }).start();
+ }
+ return null;
+ }).when(spyBookKeeper).asyncDeleteLedger(any(long.class),
any(AsyncCallback.DeleteCallback.class), any());
+ return deleteLedgerInfo;
+ }
+
+ /***
+ * This test simulates the following problems that can occur when ZK
connections are unstable:
+ * - add entry timeout
+ * - write ZK fail when update ledger info of ML
+ * and verifies that ledger info of ML is still correct when the above
problems occur.
+ */
+ @Test
+ public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception {
+ String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut";
+ BookKeeper spyBookKeeper = spy(bkc);
+ ManagedLedgerFactoryImpl factory = new
ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
+
+ // Make add entry timeout(The data write was actually successful).
+ AtomicBoolean addEntryFinished = new AtomicBoolean(false);
+ makeAddEntryTimeout(ml, addEntryFinished);
+
+ // Make the update operation of ledger info failure when switch ledger.
+ metadataStore.failConditional(new
MetadataStoreException.BadVersionException(""), (opType, path) -> {
+ if (opType == FaultInjectionMetadataStore.OperationType.PUT &&
addEntryFinished.get()
+ &&
"/managed-ledgers/testLedgerInfoMetaCorrectIfAddEntryTimeOut".equals(path)) {
+ return true;
+ }
+ return false;
+ });
+
+ // Make delete ledger is delayed if delete is called.
+ AtomicBoolean deleteLedgerDelaySignal = new AtomicBoolean(false);
+ DeleteLedgerInfo deleteLedgerInfo =
+ makeDelayIfDoLedgerDelete(ml.currentLedger,
deleteLedgerDelaySignal, spyBookKeeper);
+
+ // Add one entry.
+ // - it will fail and trigger ledger switch(we mocked the error).
+ // - ledger switch will also fail(we mocked the error).
+ try {
+ ml.addEntry("1".getBytes(Charset.defaultCharset()));
+ fail("Expected the operation of add entry will fail by timeout or
ledger fenced.");
+ } catch (Exception e){
+ // expected ex.
+ }
+
+ // Reopen ML.
+ try {
+ ml.close();
+ fail("Expected the operation of ml close will fail by fenced
state.");
+ } catch (Exception e){
+ // expected ex.
+ }
+ ManagedLedgerImpl mlReopened = (ManagedLedgerImpl)
factory.open(mlName);
+ deleteLedgerDelaySignal.set(true);
+ if (deleteLedgerInfo.hasCalled){
+ deleteLedgerInfo.future.join();
+ }
+ mlReopened.close();
+
+ // verify: all ledgers in ledger info is worked.
+ for (long ledgerId : mlReopened.getLedgersInfo().keySet()){
+ LedgerHandle lh = bkc.openLedger(ledgerId, ml.digestType,
ml.getConfig().getPassword());
+ lh.close();
+ }
+ }
+
@Test
public void managedLedgerApi() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");