This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 00ccdd71b6d7eb8b1acb90e1271ec6f711e23c5d
Author: fengyubiao <[email protected]>
AuthorDate: Thu Oct 27 15:06:26 2022 +0800

    When call openLedgerOp, make the timeout ex is a separate error code (#3562)
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    When we execute `bkClient.openLedger(ledgerId)`, the execution flow is as 
follows:
    
    1. start opening the ledger
    2. get ledger meta
    3. read the last confirmed entry
    4. open ledger success
    
    If we get the correct ledgerMeta at step 2, this means that this ledger has 
not been deleted. If step 3 times out, we should try again to make sure the 
ledger exists until we get a clear response from the BK server.<strong>(High 
light)</strong>However, in the current implementation, the timeout exception is 
rewritten as a `LedgerRecoveryException`, making it impossible to determine 
whether we should retry.
    
    Log:
    ```
    Oct 17, 2022 22:54:05.818 [BookKeeperClientWorker-OrderedExecutor-16-0] 
ERROR org.apache.bookkeeper.client.ReadLastConfirmedOp - While 
readLastConfirmed ledger: 59158316 did not hear success responses from all 
quorums, QuorumCoverage(e:2,w:2,a:2) = [-23, -23]
    Oct 17, 2022 22:54:05.818 [BookKeeperClientWorker-OrderedExecutor-16-0] 
INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - 
[order/org-217/persistent/p1010-tx] Opened ledger 59158316 for consumer 
order-service. rc=-10
    ```
    
    Looking at the ledger metadata:
    ```
    LedgerMetadata{formatVersion=3, ensembleSize=2, writeQuorumSize=2, 
ackQuorumSize=2, state=IN_RECOVERY, digestType=CRC32C, password=base64:, 
ensembles={0=[***:3181, ***:3181]}, customMetadata={component=***, 
pulsar/managed-ledger=***, pulsar/cursor=***, application=***}}
    ```
    
    see also: https://github.com/apache/pulsar/pull/18123
    
    ### Changes
    
    - When calling openLedgerOp, do not rewritten `TiemoutException` as a 
`LedgerRecoveryException`
    - add the dependency: `junit4-dataprovider`
    - use `@DataProvider` to simpler the test case "testOpenLedgerRecover" & 
"testOpenLedgerNoRecover"
    
    (cherry picked from commit ef31c7a37415c063b2f485f63198ec28e8f9f0fa)
---
 .../org/apache/bookkeeper/client/BKException.java  |   2 +-
 .../org/apache/bookkeeper/client/LedgerOpenOp.java |  22 ++-
 .../apache/bookkeeper/client/LedgerRecoveryOp.java |   2 +
 .../api/BookKeeperBuildersOpenLedgerTest.java      | 164 +++++++++++++++++++++
 .../client/api/BookKeeperBuildersTest.java         |  42 ------
 5 files changed, 183 insertions(+), 49 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 7435c67edf..a7b51fa2f9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -289,7 +289,7 @@ public abstract class BKException extends 
org.apache.bookkeeper.client.api.BKExc
      */
     public static class BKNoSuchLedgerExistsOnMetadataServerException extends 
BKException {
         public BKNoSuchLedgerExistsOnMetadataServerException() {
-            super(Code.NoSuchLedgerExistsOnMetadataServerException);
+            
super(BKException.Code.NoSuchLedgerExistsOnMetadataServerException);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 0f688639d2..0b2be9e64c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -202,15 +202,18 @@ class LedgerOpenOp {
                 public void safeOperationComplete(int rc, Void result) {
                     if (rc == BKException.Code.OK) {
                         openComplete(BKException.Code.OK, lh);
-                    } else if (rc == 
BKException.Code.UnauthorizedAccessException) {
-                        closeLedgerHandleAsync().whenComplete((r, ex) -> {
+                    } else {
+                        closeLedgerHandleAsync().whenComplete((ignore, ex) -> {
                             if (ex != null) {
                                 LOG.error("Ledger {} close failed", ledgerId, 
ex);
                             }
-                            
openComplete(BKException.Code.UnauthorizedAccessException, null);
+                            if (rc == 
BKException.Code.UnauthorizedAccessException
+                                    || rc == 
BKException.Code.TimeoutException) {
+                                openComplete(rc, null);
+                            } else {
+                                
openComplete(bk.getReturnRc(BKException.Code.LedgerRecoveryException), null);
+                            }
                         });
-                    } else {
-                        
openComplete(bk.getReturnRc(BKException.Code.LedgerRecoveryException), null);
                     }
                 }
                 @Override
@@ -223,7 +226,14 @@ class LedgerOpenOp {
                 @Override
                 public void readLastConfirmedComplete(int rc,
                         long lastConfirmed, Object ctx) {
-                    if (rc != BKException.Code.OK) {
+                    if (rc == BKException.Code.TimeoutException) {
+                        closeLedgerHandleAsync().whenComplete((r, ex) -> {
+                            if (ex != null) {
+                                LOG.error("Ledger {} close failed", ledgerId, 
ex);
+                            }
+                            openComplete(bk.getReturnRc(rc), null);
+                        });
+                    } else if (rc != BKException.Code.OK) {
                         closeLedgerHandleAsync().whenComplete((r, ex) -> {
                             if (ex != null) {
                                 LOG.error("Ledger {} close failed", ledgerId, 
ex);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 08a9f2e553..8fd2bbe3b9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -130,6 +130,8 @@ class LedgerRecoveryOp implements ReadEntryListener, 
AddCallback {
                             // ledger recovery
                             metadataForRecovery = lh.getLedgerMetadata();
                             doRecoveryRead();
+                        } else if (rc == BKException.Code.TimeoutException) {
+                            submitCallback(rc);
                         } else if (rc == 
BKException.Code.UnauthorizedAccessException) {
                             submitCallback(rc);
                         } else {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersOpenLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersOpenLedgerTest.java
new file mode 100644
index 0000000000..46821cc353
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersOpenLedgerTest.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doAnswer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.MockBookKeeperTestCase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class BookKeeperBuildersOpenLedgerTest extends MockBookKeeperTestCase {
+
+    private static final int ensembleSize = 3;
+    private static final int writeQuorumSize = 2;
+    private static final int ackQuorumSize = 1;
+    private static final long ledgerId = 12342L;
+    private static final Map<String, byte[]> customMetadata = new HashMap<>();
+    private static final byte[] password = new byte[3];
+    private static final byte[] entryData = new byte[32];
+
+    private boolean withRecovery;
+
+    public BookKeeperBuildersOpenLedgerTest(boolean withRecovery) {
+        this.withRecovery = withRecovery;
+    }
+
+    @Parameterized.Parameters(name = "withRecovery:({0})")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][]{
+                {true},
+                {false}
+        });
+    }
+
+    @Test
+    public void testOpenLedger() throws Exception {
+        LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize,
+            writeQuorumSize, ackQuorumSize, password, customMetadata);
+        registerMockLedgerMetadata(ledgerId, ledgerMetadata);
+
+        ledgerMetadata.getAllEnsembles().values().forEach(bookieAddressList -> 
{
+            bookieAddressList.forEach(bookieAddress -> {
+                    registerMockEntryForRead(ledgerId, 
BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1);
+                    registerMockEntryForRead(ledgerId, 0, bookieAddress, 
entryData, -1);
+            });
+        });
+
+        result(newOpenLedgerOp()
+            .withPassword(ledgerMetadata.getPassword())
+            .withDigestType(DigestType.CRC32)
+            .withLedgerId(ledgerId)
+            .withRecovery(withRecovery)
+            .execute());
+    }
+
+    @Test
+    public void testOpenLedgerWithTimeoutEx() throws Exception {
+        mockReadEntryTimeout();
+        LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize,
+                writeQuorumSize, ackQuorumSize, password, customMetadata);
+        registerMockLedgerMetadata(ledgerId, ledgerMetadata);
+        ledgerMetadata.getAllEnsembles().values().forEach(bookieAddressList -> 
{
+            bookieAddressList.forEach(bookieAddress -> {
+                registerMockEntryForRead(ledgerId, 
BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1);
+                registerMockEntryForRead(ledgerId, 0, bookieAddress, 
entryData, -1);
+            });
+        });
+        try {
+            result(newOpenLedgerOp()
+                .withPassword(ledgerMetadata.getPassword())
+                .withDigestType(DigestType.CRC32)
+                .withLedgerId(ledgerId)
+                .withRecovery(withRecovery)
+                .execute());
+            fail("Expect timeout error");
+        } catch (BKException.BKTimeoutException timeoutException) {
+            // Expect timeout error.
+        }
+        // Reset bk client.
+        resetBKClient();
+    }
+
+    protected LedgerMetadata generateLedgerMetadata(int ensembleSize,
+        int writeQuorumSize, int ackQuorumSize, byte[] password,
+        Map<String, byte[]> customMetadata) throws 
BKException.BKNotEnoughBookiesException {
+        return LedgerMetadataBuilder.create()
+            .withId(12L)
+            .withEnsembleSize(ensembleSize)
+            .withWriteQuorumSize(writeQuorumSize)
+            .withAckQuorumSize(ackQuorumSize)
+            .withPassword(password)
+            .withDigestType(BookKeeper.DigestType.CRC32.toApiDigestType())
+            .withCustomMetadata(customMetadata)
+            .withCreationTime(System.currentTimeMillis())
+            .newEnsembleEntry(0, generateNewEnsemble(ensembleSize)).build();
+    }
+
+    private void mockReadEntryTimeout() {
+        // Mock read entry.
+        doAnswer(invocation -> {
+            long ledgerId = (long) invocation.getArguments()[1];
+            long entryId = (long) invocation.getArguments()[2];
+
+            BookkeeperInternalCallbacks.ReadEntryCallback callback =
+                    (BookkeeperInternalCallbacks.ReadEntryCallback) 
invocation.getArguments()[3];
+            Object ctx = invocation.getArguments()[4];
+            callback.readEntryComplete(BKException.Code.TimeoutException, 
ledgerId, entryId, null, ctx);
+            return null;
+        }).when(bookieClient).readEntry(any(BookieId.class),
+                anyLong(), anyLong(), 
any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
+                any(), anyInt(), any());
+        // Mock read lac.
+        doAnswer(invocation -> {
+            long ledgerId = (long) invocation.getArguments()[1];
+            BookkeeperInternalCallbacks.ReadLacCallback callback =
+                    (BookkeeperInternalCallbacks.ReadLacCallback) 
invocation.getArguments()[2];
+            Object ctx = invocation.getArguments()[3];
+            callback.readLacComplete(BKException.Code.TimeoutException, 
ledgerId, null, null, ctx);
+            return null;
+        }).when(bookieClient).readLac(any(BookieId.class),
+                anyLong(), 
any(BookkeeperInternalCallbacks.ReadLacCallback.class),
+                any());
+    }
+
+    private void resetBKClient() throws Exception {
+        tearDown();
+        setup();
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
index 246c3d30db..49c2d17f09 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -37,7 +37,6 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadataBuilder;
 import org.apache.bookkeeper.client.MockBookKeeperTestCase;
 import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.proto.BookieProtocol;
 import org.junit.Test;
 
 /**
@@ -330,47 +329,6 @@ public class BookKeeperBuildersTest extends 
MockBookKeeperTestCase {
             .execute());
     }
 
-    @Test
-    public void testOpenLedgerNoRecovery() throws Exception {
-        LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize,
-            writeQuorumSize, ackQuorumSize, password, customMetadata);
-        registerMockLedgerMetadata(ledgerId, ledgerMetadata);
-
-        ledgerMetadata.getAllEnsembles().values().forEach(bookieAddressList -> 
{
-            bookieAddressList.forEach(bookieAddress -> {
-                    registerMockEntryForRead(ledgerId, 
BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1);
-                    registerMockEntryForRead(ledgerId, 0, bookieAddress, 
entryData, -1);
-            });
-        });
-
-        result(newOpenLedgerOp()
-            .withPassword(ledgerMetadata.getPassword())
-            .withDigestType(DigestType.CRC32)
-            .withLedgerId(ledgerId)
-            .withRecovery(false)
-            .execute());
-    }
-
-    @Test
-    public void testOpenLedgerRecovery() throws Exception {
-        LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize,
-            writeQuorumSize, ackQuorumSize, password, customMetadata);
-        registerMockLedgerMetadata(ledgerId, ledgerMetadata);
-
-        ledgerMetadata.getAllEnsembles().values().forEach(bookieAddressList -> 
{
-            bookieAddressList.forEach(bookieAddress -> {
-                registerMockEntryForRead(ledgerId, 
BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1);
-                registerMockEntryForRead(ledgerId, 0, bookieAddress, 
entryData, -1);
-            });
-        });
-        result(newOpenLedgerOp()
-            .withPassword(ledgerMetadata.getPassword())
-            .withDigestType(DigestType.CRC32)
-            .withLedgerId(ledgerId)
-            .withRecovery(true)
-            .execute());
-    }
-
     @Test(expected = BKIncorrectParameterException.class)
     public void testDeleteLedgerNoLedgerId() throws Exception {
         result(newDeleteLedgerOp()

Reply via email to