This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit d1f72b4ac6bd411927c59a29c28870a61f465371 Author: wenbingshen <[email protected]> AuthorDate: Wed Oct 19 14:30:05 2022 +0800 Rename success with writableResult and update final writableResult about wait writeSet (#3505) ### Changes update `success` about wait writeSet for Writable result. (cherry picked from commit 2e1a2f0acd56e7f327e7fddd17d8a39d8f8fdbdf) --- .../org/apache/bookkeeper/client/LedgerHandle.java | 19 +++++------ .../apache/bookkeeper/client/SlowBookieTest.java | 37 ++++++++++++++++++++++ 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 316cc7d32d..50fc2d6624 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -1256,14 +1256,14 @@ public class LedgerHandle implements WriteHandle { } final long startTime = MathUtils.nowInNano(); - boolean success = isWriteSetWritable(writeSet, allowedNonWritableCount); + boolean writableResult = isWriteSetWritable(writeSet, allowedNonWritableCount); - if (!success && durationMs > 0) { + if (!writableResult && durationMs > 0) { int backoff = 1; final int maxBackoff = 4; final long deadline = startTime + TimeUnit.MILLISECONDS.toNanos(durationMs); - while (!isWriteSetWritable(writeSet, allowedNonWritableCount)) { + while (!(writableResult = isWriteSetWritable(writeSet, allowedNonWritableCount))) { if (MathUtils.nowInNano() < deadline) { long maxSleep = MathUtils.elapsedMSec(startTime); if (maxSleep < 0) { @@ -1275,32 +1275,33 @@ public class LedgerHandle implements WriteHandle { TimeUnit.MILLISECONDS.sleep(sleepMs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - success = isWriteSetWritable(writeSet, allowedNonWritableCount); + writableResult = isWriteSetWritable(writeSet, allowedNonWritableCount); break; } if (backoff <= maxBackoff) { backoff++; } } else { - success = false; + writableResult = false; break; } } if (backoff > 1) { - LOG.info("Spent {} ms waiting for {} writable channels", + LOG.info("Spent {} ms waiting for {} writable channels, writable result {}", MathUtils.elapsedMSec(startTime), - writeSet.size() - allowedNonWritableCount); + writeSet.size() - allowedNonWritableCount, + writableResult); } } - if (success) { + if (writableResult) { clientChannelWriteWaitStats.registerSuccessfulEvent( MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } else { clientChannelWriteWaitStats.registerFailedEvent( MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } - return success; + return writableResult; } protected void doAsyncAddEntry(final PendingAddOp op) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java index 02ecf45ee5..b8e5d2495d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookieClientImpl; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.awaitility.Awaitility; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -392,4 +393,40 @@ public class SlowBookieTest extends BookKeeperClusterTestCase { checklatch.await(); assertEquals("There should be no missing fragments", 0, numFragments.get()); } + + @Test + public void testWaitForWritable() throws Exception { + final ClientConfiguration conf = new ClientConfiguration(); + conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper bkc = new BookKeeper(conf); + + byte[] pwd = new byte[]{}; + try (LedgerHandle lh = bkc.createLedger(1, 1, 1, BookKeeper.DigestType.CRC32, pwd)) { + long entryId = lh.addEntry(this.entry); + + RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(1, 1, 1); + DistributionSchedule.WriteSet writeSet = schedule.getWriteSet(entryId); + + int slowBookieIndex = writeSet.get(ThreadLocalRandom.current().nextInt(writeSet.size())); + List<BookieId> curEns = lh.getCurrentEnsemble(); + + // disable channel writable + setTargetChannelState(bkc, curEns.get(slowBookieIndex), 0, false); + + AtomicBoolean isWriteable = new AtomicBoolean(false); + final long timeout = 10000; + + // waitForWritable async + new Thread(() -> { + isWriteable.set(lh.waitForWritable(writeSet, 0, timeout)); + }).start(); + TimeUnit.MILLISECONDS.sleep(5000); + assertFalse(isWriteable.get()); + + // enable channel writable + setTargetChannelState(bkc, curEns.get(slowBookieIndex), 0, true); + Awaitility.await().untilAsserted(() -> assertTrue(isWriteable.get())); + } + } + }
