This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 00ccdd71b6d7eb8b1acb90e1271ec6f711e23c5d Author: fengyubiao <[email protected]> AuthorDate: Thu Oct 27 15:06:26 2022 +0800 When call openLedgerOp, make the timeout ex is a separate error code (#3562) Descriptions of the changes in this PR: ### Motivation When we execute `bkClient.openLedger(ledgerId)`, the execution flow is as follows: 1. start opening the ledger 2. get ledger meta 3. read the last confirmed entry 4. open ledger success If we get the correct ledgerMeta at step 2, this means that this ledger has not been deleted. If step 3 times out, we should try again to make sure the ledger exists until we get a clear response from the BK server.<strong>(High light)</strong>However, in the current implementation, the timeout exception is rewritten as a `LedgerRecoveryException`, making it impossible to determine whether we should retry. Log: ``` Oct 17, 2022 22:54:05.818 [BookKeeperClientWorker-OrderedExecutor-16-0] ERROR org.apache.bookkeeper.client.ReadLastConfirmedOp - While readLastConfirmed ledger: 59158316 did not hear success responses from all quorums, QuorumCoverage(e:2,w:2,a:2) = [-23, -23] Oct 17, 2022 22:54:05.818 [BookKeeperClientWorker-OrderedExecutor-16-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [order/org-217/persistent/p1010-tx] Opened ledger 59158316 for consumer order-service. rc=-10 ``` Looking at the ledger metadata: ``` LedgerMetadata{formatVersion=3, ensembleSize=2, writeQuorumSize=2, ackQuorumSize=2, state=IN_RECOVERY, digestType=CRC32C, password=base64:, ensembles={0=[***:3181, ***:3181]}, customMetadata={component=***, pulsar/managed-ledger=***, pulsar/cursor=***, application=***}} ``` see also: https://github.com/apache/pulsar/pull/18123 ### Changes - When calling openLedgerOp, do not rewritten `TiemoutException` as a `LedgerRecoveryException` - add the dependency: `junit4-dataprovider` - use `@DataProvider` to simpler the test case "testOpenLedgerRecover" & "testOpenLedgerNoRecover" (cherry picked from commit ef31c7a37415c063b2f485f63198ec28e8f9f0fa) --- .../org/apache/bookkeeper/client/BKException.java | 2 +- .../org/apache/bookkeeper/client/LedgerOpenOp.java | 22 ++- .../apache/bookkeeper/client/LedgerRecoveryOp.java | 2 + .../api/BookKeeperBuildersOpenLedgerTest.java | 164 +++++++++++++++++++++ .../client/api/BookKeeperBuildersTest.java | 42 ------ 5 files changed, 183 insertions(+), 49 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java index 7435c67edf..a7b51fa2f9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java @@ -289,7 +289,7 @@ public abstract class BKException extends org.apache.bookkeeper.client.api.BKExc */ public static class BKNoSuchLedgerExistsOnMetadataServerException extends BKException { public BKNoSuchLedgerExistsOnMetadataServerException() { - super(Code.NoSuchLedgerExistsOnMetadataServerException); + super(BKException.Code.NoSuchLedgerExistsOnMetadataServerException); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 0f688639d2..0b2be9e64c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -202,15 +202,18 @@ class LedgerOpenOp { public void safeOperationComplete(int rc, Void result) { if (rc == BKException.Code.OK) { openComplete(BKException.Code.OK, lh); - } else if (rc == BKException.Code.UnauthorizedAccessException) { - closeLedgerHandleAsync().whenComplete((r, ex) -> { + } else { + closeLedgerHandleAsync().whenComplete((ignore, ex) -> { if (ex != null) { LOG.error("Ledger {} close failed", ledgerId, ex); } - openComplete(BKException.Code.UnauthorizedAccessException, null); + if (rc == BKException.Code.UnauthorizedAccessException + || rc == BKException.Code.TimeoutException) { + openComplete(rc, null); + } else { + openComplete(bk.getReturnRc(BKException.Code.LedgerRecoveryException), null); + } }); - } else { - openComplete(bk.getReturnRc(BKException.Code.LedgerRecoveryException), null); } } @Override @@ -223,7 +226,14 @@ class LedgerOpenOp { @Override public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) { - if (rc != BKException.Code.OK) { + if (rc == BKException.Code.TimeoutException) { + closeLedgerHandleAsync().whenComplete((r, ex) -> { + if (ex != null) { + LOG.error("Ledger {} close failed", ledgerId, ex); + } + openComplete(bk.getReturnRc(rc), null); + }); + } else if (rc != BKException.Code.OK) { closeLedgerHandleAsync().whenComplete((r, ex) -> { if (ex != null) { LOG.error("Ledger {} close failed", ledgerId, ex); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java index 08a9f2e553..8fd2bbe3b9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java @@ -130,6 +130,8 @@ class LedgerRecoveryOp implements ReadEntryListener, AddCallback { // ledger recovery metadataForRecovery = lh.getLedgerMetadata(); doRecoveryRead(); + } else if (rc == BKException.Code.TimeoutException) { + submitCallback(rc); } else if (rc == BKException.Code.UnauthorizedAccessException) { submitCallback(rc); } else { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersOpenLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersOpenLedgerTest.java new file mode 100644 index 0000000000..46821cc353 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersOpenLedgerTest.java @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client.api; + +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerMetadataBuilder; +import org.apache.bookkeeper.client.MockBookKeeperTestCase; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class BookKeeperBuildersOpenLedgerTest extends MockBookKeeperTestCase { + + private static final int ensembleSize = 3; + private static final int writeQuorumSize = 2; + private static final int ackQuorumSize = 1; + private static final long ledgerId = 12342L; + private static final Map<String, byte[]> customMetadata = new HashMap<>(); + private static final byte[] password = new byte[3]; + private static final byte[] entryData = new byte[32]; + + private boolean withRecovery; + + public BookKeeperBuildersOpenLedgerTest(boolean withRecovery) { + this.withRecovery = withRecovery; + } + + @Parameterized.Parameters(name = "withRecovery:({0})") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][]{ + {true}, + {false} + }); + } + + @Test + public void testOpenLedger() throws Exception { + LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize, + writeQuorumSize, ackQuorumSize, password, customMetadata); + registerMockLedgerMetadata(ledgerId, ledgerMetadata); + + ledgerMetadata.getAllEnsembles().values().forEach(bookieAddressList -> { + bookieAddressList.forEach(bookieAddress -> { + registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1); + registerMockEntryForRead(ledgerId, 0, bookieAddress, entryData, -1); + }); + }); + + result(newOpenLedgerOp() + .withPassword(ledgerMetadata.getPassword()) + .withDigestType(DigestType.CRC32) + .withLedgerId(ledgerId) + .withRecovery(withRecovery) + .execute()); + } + + @Test + public void testOpenLedgerWithTimeoutEx() throws Exception { + mockReadEntryTimeout(); + LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize, + writeQuorumSize, ackQuorumSize, password, customMetadata); + registerMockLedgerMetadata(ledgerId, ledgerMetadata); + ledgerMetadata.getAllEnsembles().values().forEach(bookieAddressList -> { + bookieAddressList.forEach(bookieAddress -> { + registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1); + registerMockEntryForRead(ledgerId, 0, bookieAddress, entryData, -1); + }); + }); + try { + result(newOpenLedgerOp() + .withPassword(ledgerMetadata.getPassword()) + .withDigestType(DigestType.CRC32) + .withLedgerId(ledgerId) + .withRecovery(withRecovery) + .execute()); + fail("Expect timeout error"); + } catch (BKException.BKTimeoutException timeoutException) { + // Expect timeout error. + } + // Reset bk client. + resetBKClient(); + } + + protected LedgerMetadata generateLedgerMetadata(int ensembleSize, + int writeQuorumSize, int ackQuorumSize, byte[] password, + Map<String, byte[]> customMetadata) throws BKException.BKNotEnoughBookiesException { + return LedgerMetadataBuilder.create() + .withId(12L) + .withEnsembleSize(ensembleSize) + .withWriteQuorumSize(writeQuorumSize) + .withAckQuorumSize(ackQuorumSize) + .withPassword(password) + .withDigestType(BookKeeper.DigestType.CRC32.toApiDigestType()) + .withCustomMetadata(customMetadata) + .withCreationTime(System.currentTimeMillis()) + .newEnsembleEntry(0, generateNewEnsemble(ensembleSize)).build(); + } + + private void mockReadEntryTimeout() { + // Mock read entry. + doAnswer(invocation -> { + long ledgerId = (long) invocation.getArguments()[1]; + long entryId = (long) invocation.getArguments()[2]; + + BookkeeperInternalCallbacks.ReadEntryCallback callback = + (BookkeeperInternalCallbacks.ReadEntryCallback) invocation.getArguments()[3]; + Object ctx = invocation.getArguments()[4]; + callback.readEntryComplete(BKException.Code.TimeoutException, ledgerId, entryId, null, ctx); + return null; + }).when(bookieClient).readEntry(any(BookieId.class), + anyLong(), anyLong(), any(BookkeeperInternalCallbacks.ReadEntryCallback.class), + any(), anyInt(), any()); + // Mock read lac. + doAnswer(invocation -> { + long ledgerId = (long) invocation.getArguments()[1]; + BookkeeperInternalCallbacks.ReadLacCallback callback = + (BookkeeperInternalCallbacks.ReadLacCallback) invocation.getArguments()[2]; + Object ctx = invocation.getArguments()[3]; + callback.readLacComplete(BKException.Code.TimeoutException, ledgerId, null, null, ctx); + return null; + }).when(bookieClient).readLac(any(BookieId.class), + anyLong(), any(BookkeeperInternalCallbacks.ReadLacCallback.class), + any()); + } + + private void resetBKClient() throws Exception { + tearDown(); + setup(); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java index 246c3d30db..49c2d17f09 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java @@ -37,7 +37,6 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerMetadataBuilder; import org.apache.bookkeeper.client.MockBookKeeperTestCase; import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.proto.BookieProtocol; import org.junit.Test; /** @@ -330,47 +329,6 @@ public class BookKeeperBuildersTest extends MockBookKeeperTestCase { .execute()); } - @Test - public void testOpenLedgerNoRecovery() throws Exception { - LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize, - writeQuorumSize, ackQuorumSize, password, customMetadata); - registerMockLedgerMetadata(ledgerId, ledgerMetadata); - - ledgerMetadata.getAllEnsembles().values().forEach(bookieAddressList -> { - bookieAddressList.forEach(bookieAddress -> { - registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1); - registerMockEntryForRead(ledgerId, 0, bookieAddress, entryData, -1); - }); - }); - - result(newOpenLedgerOp() - .withPassword(ledgerMetadata.getPassword()) - .withDigestType(DigestType.CRC32) - .withLedgerId(ledgerId) - .withRecovery(false) - .execute()); - } - - @Test - public void testOpenLedgerRecovery() throws Exception { - LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize, - writeQuorumSize, ackQuorumSize, password, customMetadata); - registerMockLedgerMetadata(ledgerId, ledgerMetadata); - - ledgerMetadata.getAllEnsembles().values().forEach(bookieAddressList -> { - bookieAddressList.forEach(bookieAddress -> { - registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1); - registerMockEntryForRead(ledgerId, 0, bookieAddress, entryData, -1); - }); - }); - result(newOpenLedgerOp() - .withPassword(ledgerMetadata.getPassword()) - .withDigestType(DigestType.CRC32) - .withLedgerId(ledgerId) - .withRecovery(true) - .execute()); - } - @Test(expected = BKIncorrectParameterException.class) public void testDeleteLedgerNoLedgerId() throws Exception { result(newDeleteLedgerOp()
