This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3314d70231c [fix] [ml] topic load fail by ledger lost (#19444)
3314d70231c is described below

commit 3314d70231cbed89b6eefa2073ef8c048d84ec16
Author: fengyubiao <[email protected]>
AuthorDate: Tue Feb 21 21:47:28 2023 +0800

    [fix] [ml] topic load fail by ledger lost (#19444)
    
    Makes only ledgers removed from the meta of ledger info can be deleted from 
the BK.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  27 +++--
 .../mledger/impl/ShadowManagedLedgerImpl.java      |   5 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 114 +++++++++++++++++++++
 3 files changed, 135 insertions(+), 11 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e334adf078a..9c05fb7c104 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -68,6 +68,7 @@ import 
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import lombok.Getter;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
@@ -471,6 +472,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
 
         // Calculate total entries and size
+        final List<Long> emptyLedgersToBeDeleted = 
Collections.synchronizedList(new ArrayList<>());
         Iterator<LedgerInfo> iterator = ledgers.values().iterator();
         while (iterator.hasNext()) {
             LedgerInfo li = iterator.next();
@@ -479,9 +481,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
             } else {
                 iterator.remove();
-                bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> {
-                    log.info("[{}] Deleted empty ledger ledgerId={} rc={}", 
name, li.getLedgerId(), rc);
-                }, null);
+                emptyLedgersToBeDeleted.add(li.getLedgerId());
             }
         }
 
@@ -497,6 +497,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             @Override
             public void operationComplete(Void v, Stat stat) {
                 ledgersStat = stat;
+                emptyLedgersToBeDeleted.forEach(ledgerId -> {
+                    bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+                        log.info("[{}] Deleted empty ledger ledgerId={} 
rc={}", name, ledgerId, rc);
+                    }, null);
+                });
                 initializeCursors(callback);
             }
 
@@ -1551,11 +1556,12 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                     }
                     ledgersStat = stat;
                     synchronized (ManagedLedgerImpl.this) {
+                        LedgerHandle originalCurrentLedger = currentLedger;
                         ledgers.put(lh.getId(), newLedger);
                         currentLedger = lh;
                         currentLedgerEntries = 0;
                         currentLedgerSize = 0;
-                        updateLedgersIdsComplete();
+                        updateLedgersIdsComplete(originalCurrentLedger);
                         
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
                                 - lastLedgerCreationInitiationTimestamp, 
TimeUnit.MILLISECONDS);
                     }
@@ -1646,8 +1652,15 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         } while (existsOp != null && --pendingSize > 0);
     }
 
-    protected synchronized void updateLedgersIdsComplete() {
+    protected synchronized void updateLedgersIdsComplete(@Nullable 
LedgerHandle originalCurrentLedger) {
         STATE_UPDATER.set(this, State.LedgerOpened);
+        // Delete original "currentLedger" if it has been removed from 
"ledgers".
+        if (originalCurrentLedger != null && 
!ledgers.containsKey(originalCurrentLedger.getId())){
+            bookKeeper.asyncDeleteLedger(originalCurrentLedger.getId(), (rc, 
ctx) -> {
+                mbean.endDataLedgerDeleteOp();
+                log.info("[{}] Delete complete for empty ledger {}. rc={}", 
name, originalCurrentLedger.getId(), rc);
+            }, null);
+        }
         updateLastLedgerCreatedTimeAndScheduleRolloverTask();
 
         if (log.isDebugEnabled()) {
@@ -1710,10 +1723,6 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             // The last ledger was empty, so we can discard it
             ledgers.remove(lh.getId());
             mbean.startDataLedgerDeleteOp();
-            bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
-                mbean.endDataLedgerDeleteOp();
-                log.info("[{}] Delete complete for empty ledger {}. rc={}", 
name, lh.getId(), rc);
-            }, null);
         }
 
         trimConsumedLedgersInBackground();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index 9a029778fe0..b1f23941347 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -332,7 +333,7 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
                             currentLedgerEntries = 0;
                             currentLedgerSize = 0;
                             initLastConfirmedEntry();
-                            updateLedgersIdsComplete();
+                            updateLedgersIdsComplete(null);
                             maybeUpdateCursorBeforeTrimmingConsumedLedger();
                         } else if (isNoSuchLedgerExistsException(rc)) {
                             log.warn("[{}] Source ledger not found: {}", name, 
lastLedgerId);
@@ -365,7 +366,7 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
     }
 
     @Override
-    protected synchronized void updateLedgersIdsComplete() {
+    protected synchronized void updateLedgersIdsComplete(LedgerHandle 
originalCurrentLedger) {
         STATE_UPDATER.set(this, State.LedgerOpened);
         updateLastLedgerCreatedTimeAndScheduleRolloverTask();
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 3b011fe8d56..a4d8b75d00c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -48,6 +48,7 @@ import java.nio.ReadOnlyBufferException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -75,7 +76,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import lombok.Cleanup;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -144,6 +147,117 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
 
+    private void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean 
addEntryFinished) throws Exception {
+        LedgerHandle currentLedger = ml.currentLedger;
+        final LedgerHandle spyLedgerHandle = spy(currentLedger);
+        doAnswer(invocation -> {
+            ByteBuf bs = (ByteBuf) invocation.getArguments()[0];
+            AddCallback addCallback = (AddCallback) 
invocation.getArguments()[1];
+            Object originalContext = invocation.getArguments()[2];
+            currentLedger.asyncAddEntry(bs, (rc, lh, entryId, ctx) -> {
+                addEntryFinished.set(true);
+                addCallback.addComplete(BKException.Code.TimeoutException, 
spyLedgerHandle,  -1, ctx);
+            }, originalContext);
+            return null;
+        }).when(spyLedgerHandle).asyncAddEntry(any(ByteBuf.class), 
any(AddCallback.class), any());
+        ml.currentLedger = spyLedgerHandle;
+    }
+
+    @Data
+    private static class DeleteLedgerInfo{
+        volatile boolean hasCalled;
+        volatile CompletableFuture<Void> future = new CompletableFuture<>();
+    }
+
+    private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, 
final AtomicBoolean signal,
+                                                              BookKeeper 
spyBookKeeper) {
+        DeleteLedgerInfo deleteLedgerInfo = new DeleteLedgerInfo();
+        doAnswer(invocation -> {
+            long ledgerId = (long) invocation.getArguments()[0];
+            AsyncCallback.DeleteCallback originalCb = 
(AsyncCallback.DeleteCallback) invocation.getArguments()[1];
+            AsyncCallback.DeleteCallback cb = (rc, ctx) -> {
+                if (deleteLedgerInfo.hasCalled) {
+                    deleteLedgerInfo.future.complete(null);
+                }
+                originalCb.deleteComplete(rc, ctx);
+            };
+            Object ctx = invocation.getArguments()[2];
+            if (ledgerId != ledger.getId()){
+                bkc.asyncDeleteLedger(ledgerId, originalCb, ctx);
+            } else {
+                deleteLedgerInfo.hasCalled = true;
+                new Thread(() -> {
+                    
Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get);
+                    bkc.asyncDeleteLedger(ledgerId, cb, ctx);
+                }).start();
+            }
+            return null;
+        }).when(spyBookKeeper).asyncDeleteLedger(any(long.class), 
any(AsyncCallback.DeleteCallback.class), any());
+        return deleteLedgerInfo;
+    }
+
+    /***
+     * This test simulates the following problems that can occur when ZK 
connections are unstable:
+     *  - add entry timeout
+     *  - write ZK fail when update ledger info of ML
+     * and verifies that ledger info of ML is still correct when the above 
problems occur.
+     */
+    @Test
+    public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception {
+        String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut";
+        BookKeeper spyBookKeeper = spy(bkc);
+        ManagedLedgerFactoryImpl factory = new 
ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
+
+        // Make add entry timeout(The data write was actually successful).
+        AtomicBoolean addEntryFinished = new AtomicBoolean(false);
+        makeAddEntryTimeout(ml, addEntryFinished);
+
+        // Make the update operation of ledger info failure when switch ledger.
+        metadataStore.failConditional(new 
MetadataStoreException.BadVersionException(""), (opType, path) -> {
+            if (opType == FaultInjectionMetadataStore.OperationType.PUT && 
addEntryFinished.get()
+                    && 
"/managed-ledgers/testLedgerInfoMetaCorrectIfAddEntryTimeOut".equals(path)) {
+                return true;
+            }
+            return false;
+        });
+
+        // Make delete ledger is delayed if delete is called.
+        AtomicBoolean deleteLedgerDelaySignal = new AtomicBoolean(false);
+        DeleteLedgerInfo deleteLedgerInfo =
+                makeDelayIfDoLedgerDelete(ml.currentLedger, 
deleteLedgerDelaySignal, spyBookKeeper);
+
+        // Add one entry.
+        // - it will fail and trigger ledger switch(we mocked the error).
+        // - ledger switch will also fail(we mocked the error).
+        try {
+            ml.addEntry("1".getBytes(Charset.defaultCharset()));
+            fail("Expected the operation of add entry will fail by timeout or 
ledger fenced.");
+        } catch (Exception e){
+            // expected ex.
+        }
+
+        // Reopen ML.
+        try {
+            ml.close();
+            fail("Expected the operation of ml close will fail by fenced 
state.");
+        } catch (Exception e){
+            // expected ex.
+        }
+        ManagedLedgerImpl mlReopened = (ManagedLedgerImpl) 
factory.open(mlName);
+        deleteLedgerDelaySignal.set(true);
+        if (deleteLedgerInfo.hasCalled){
+            deleteLedgerInfo.future.join();
+        }
+        mlReopened.close();
+
+        // verify: all ledgers in ledger info is worked.
+        for (long ledgerId : mlReopened.getLedgersInfo().keySet()){
+            LedgerHandle lh = bkc.openLedger(ledgerId, ml.digestType, 
ml.getConfig().getPassword());
+            lh.close();
+        }
+    }
+
     @Test
     public void managedLedgerApi() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");

Reply via email to