This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e1a2f0acd Rename success with writableResult and update final 
writableResult about wait writeSet (#3505)
2e1a2f0acd is described below

commit 2e1a2f0acd56e7f327e7fddd17d8a39d8f8fdbdf
Author: wenbingshen <[email protected]>
AuthorDate: Wed Oct 19 14:30:05 2022 +0800

    Rename success with writableResult and update final writableResult about 
wait writeSet (#3505)
    
    ### Changes
    update `success` about wait writeSet for Writable result.
---
 .../org/apache/bookkeeper/client/LedgerHandle.java | 19 +++++------
 .../apache/bookkeeper/client/SlowBookieTest.java   | 37 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 9 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 1e34f9f977..da720f111f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1255,14 +1255,14 @@ public class LedgerHandle implements WriteHandle {
         }
 
         final long startTime = MathUtils.nowInNano();
-        boolean success = isWriteSetWritable(writeSet, 
allowedNonWritableCount);
+        boolean writableResult = isWriteSetWritable(writeSet, 
allowedNonWritableCount);
 
-        if (!success && durationMs > 0) {
+        if (!writableResult && durationMs > 0) {
             int backoff = 1;
             final int maxBackoff = 4;
             final long deadline = startTime + 
TimeUnit.MILLISECONDS.toNanos(durationMs);
 
-            while (!isWriteSetWritable(writeSet, allowedNonWritableCount)) {
+            while (!(writableResult = isWriteSetWritable(writeSet, 
allowedNonWritableCount))) {
                 if (MathUtils.nowInNano() < deadline) {
                     long maxSleep = MathUtils.elapsedMSec(startTime);
                     if (maxSleep < 0) {
@@ -1274,32 +1274,33 @@ public class LedgerHandle implements WriteHandle {
                         TimeUnit.MILLISECONDS.sleep(sleepMs);
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
-                        success = isWriteSetWritable(writeSet, 
allowedNonWritableCount);
+                        writableResult = isWriteSetWritable(writeSet, 
allowedNonWritableCount);
                         break;
                     }
                     if (backoff <= maxBackoff) {
                         backoff++;
                     }
                 } else {
-                    success = false;
+                    writableResult = false;
                     break;
                 }
             }
             if (backoff > 1) {
-                LOG.info("Spent {} ms waiting for {} writable channels",
+                LOG.info("Spent {} ms waiting for {} writable channels, 
writable result {}",
                         MathUtils.elapsedMSec(startTime),
-                        writeSet.size() - allowedNonWritableCount);
+                        writeSet.size() - allowedNonWritableCount,
+                        writableResult);
             }
         }
 
-        if (success) {
+        if (writableResult) {
             clientChannelWriteWaitStats.registerSuccessfulEvent(
                     MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
         } else {
             clientChannelWriteWaitStats.registerFailedEvent(
                     MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
         }
-        return success;
+        return writableResult;
     }
 
     protected void doAsyncAddEntry(final PendingAddOp op) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 77b84a7f14..619f766750 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -390,4 +391,40 @@ public class SlowBookieTest extends 
BookKeeperClusterTestCase {
         checklatch.await();
         assertEquals("There should be no missing fragments", 0, 
numFragments.get());
     }
+
+    @Test
+    public void testWaitForWritable() throws Exception {
+        final ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] pwd = new byte[]{};
+        try (LedgerHandle lh = bkc.createLedger(1, 1, 1, 
BookKeeper.DigestType.CRC32, pwd)) {
+            long entryId = lh.addEntry(this.entry);
+
+            RoundRobinDistributionSchedule schedule = new 
RoundRobinDistributionSchedule(1, 1, 1);
+            DistributionSchedule.WriteSet writeSet = 
schedule.getWriteSet(entryId);
+
+            int slowBookieIndex = 
writeSet.get(ThreadLocalRandom.current().nextInt(writeSet.size()));
+            List<BookieId> curEns = lh.getCurrentEnsemble();
+
+            // disable channel writable
+            setTargetChannelState(bkc, curEns.get(slowBookieIndex), 0, false);
+
+            AtomicBoolean isWriteable = new AtomicBoolean(false);
+            final long timeout = 10000;
+
+            // waitForWritable async
+           new Thread(() -> {
+                isWriteable.set(lh.waitForWritable(writeSet, 0, timeout));
+            }).start();
+            TimeUnit.MILLISECONDS.sleep(5000);
+            assertFalse(isWriteable.get());
+
+            // enable channel writable
+            setTargetChannelState(bkc, curEns.get(slowBookieIndex), 0, true);
+            Awaitility.await().untilAsserted(() -> 
assertTrue(isWriteable.get()));
+        }
+    }
+
 }

Reply via email to