This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 2e1a2f0acd Rename success with writableResult and update final
writableResult about wait writeSet (#3505)
2e1a2f0acd is described below
commit 2e1a2f0acd56e7f327e7fddd17d8a39d8f8fdbdf
Author: wenbingshen <[email protected]>
AuthorDate: Wed Oct 19 14:30:05 2022 +0800
Rename success with writableResult and update final writableResult about
wait writeSet (#3505)
### Changes
update `success` about wait writeSet for Writable result.
---
.../org/apache/bookkeeper/client/LedgerHandle.java | 19 +++++------
.../apache/bookkeeper/client/SlowBookieTest.java | 37 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 9 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 1e34f9f977..da720f111f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1255,14 +1255,14 @@ public class LedgerHandle implements WriteHandle {
}
final long startTime = MathUtils.nowInNano();
- boolean success = isWriteSetWritable(writeSet,
allowedNonWritableCount);
+ boolean writableResult = isWriteSetWritable(writeSet,
allowedNonWritableCount);
- if (!success && durationMs > 0) {
+ if (!writableResult && durationMs > 0) {
int backoff = 1;
final int maxBackoff = 4;
final long deadline = startTime +
TimeUnit.MILLISECONDS.toNanos(durationMs);
- while (!isWriteSetWritable(writeSet, allowedNonWritableCount)) {
+ while (!(writableResult = isWriteSetWritable(writeSet,
allowedNonWritableCount))) {
if (MathUtils.nowInNano() < deadline) {
long maxSleep = MathUtils.elapsedMSec(startTime);
if (maxSleep < 0) {
@@ -1274,32 +1274,33 @@ public class LedgerHandle implements WriteHandle {
TimeUnit.MILLISECONDS.sleep(sleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- success = isWriteSetWritable(writeSet,
allowedNonWritableCount);
+ writableResult = isWriteSetWritable(writeSet,
allowedNonWritableCount);
break;
}
if (backoff <= maxBackoff) {
backoff++;
}
} else {
- success = false;
+ writableResult = false;
break;
}
}
if (backoff > 1) {
- LOG.info("Spent {} ms waiting for {} writable channels",
+ LOG.info("Spent {} ms waiting for {} writable channels,
writable result {}",
MathUtils.elapsedMSec(startTime),
- writeSet.size() - allowedNonWritableCount);
+ writeSet.size() - allowedNonWritableCount,
+ writableResult);
}
}
- if (success) {
+ if (writableResult) {
clientChannelWriteWaitStats.registerSuccessfulEvent(
MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
} else {
clientChannelWriteWaitStats.registerFailedEvent(
MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
- return success;
+ return writableResult;
}
protected void doAsyncAddEntry(final PendingAddOp op) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 77b84a7f14..619f766750 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.awaitility.Awaitility;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -390,4 +391,40 @@ public class SlowBookieTest extends
BookKeeperClusterTestCase {
checklatch.await();
assertEquals("There should be no missing fragments", 0,
numFragments.get());
}
+
+ @Test
+ public void testWaitForWritable() throws Exception {
+ final ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ BookKeeper bkc = new BookKeeper(conf);
+
+ byte[] pwd = new byte[]{};
+ try (LedgerHandle lh = bkc.createLedger(1, 1, 1,
BookKeeper.DigestType.CRC32, pwd)) {
+ long entryId = lh.addEntry(this.entry);
+
+ RoundRobinDistributionSchedule schedule = new
RoundRobinDistributionSchedule(1, 1, 1);
+ DistributionSchedule.WriteSet writeSet =
schedule.getWriteSet(entryId);
+
+ int slowBookieIndex =
writeSet.get(ThreadLocalRandom.current().nextInt(writeSet.size()));
+ List<BookieId> curEns = lh.getCurrentEnsemble();
+
+ // disable channel writable
+ setTargetChannelState(bkc, curEns.get(slowBookieIndex), 0, false);
+
+ AtomicBoolean isWriteable = new AtomicBoolean(false);
+ final long timeout = 10000;
+
+ // waitForWritable async
+ new Thread(() -> {
+ isWriteable.set(lh.waitForWritable(writeSet, 0, timeout));
+ }).start();
+ TimeUnit.MILLISECONDS.sleep(5000);
+ assertFalse(isWriteable.get());
+
+ // enable channel writable
+ setTargetChannelState(bkc, curEns.get(slowBookieIndex), 0, true);
+ Awaitility.await().untilAsserted(() ->
assertTrue(isWriteable.get()));
+ }
+ }
+
}