This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 21357087f0 LedgerHandle: do not complete metadata operation on the
ZookKeeper/Metadata callback thread (#3516)
21357087f0 is described below
commit 21357087f08b57a530c94591121b9803b5a486e5
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Oct 6 14:00:14 2022 +0200
LedgerHandle: do not complete metadata operation on the ZookKeeper/Metadata
callback thread (#3516)
---
.../apache/bookkeeper/client/LedgerCreateOp.java | 12 ++++++++++-
.../org/apache/bookkeeper/client/LedgerHandle.java | 24 +++++++++++++++-------
.../org/apache/bookkeeper/client/LedgerOpenOp.java | 13 +++++++++++-
.../bookkeeper/client/BookieWriteLedgerTest.java | 6 +++---
.../apache/bookkeeper/client/MockBookKeeper.java | 17 ++++++++++++---
.../apache/bookkeeper/client/MockLedgerHandle.java | 5 +++++
6 files changed, 62 insertions(+), 15 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 97b9366ac1..cabe53e8f0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -46,6 +46,7 @@ import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,7 +272,16 @@ class LedgerCreateOp {
} else {
createOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime),
TimeUnit.NANOSECONDS);
}
- cb.createComplete(rc, lh, ctx);
+ if (lh != null) { // lh is null in case of errors
+ lh.executeOrdered(new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ cb.createComplete(rc, lh, ctx);
+ }
+ });
+ } else {
+ cb.createComplete(rc, null, ctx);
+ }
}
public static class CreateBuilderImpl implements CreateBuilder {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index b043caa645..1e34f9f977 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -538,7 +538,7 @@ public class LedgerHandle implements WriteHandle {
* @param rc
*/
void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final
int rc) {
- clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new
SafeRunnable() {
+ executeOrdered(new SafeRunnable() {
@Override
public void safeRun() {
final HandleState prevHandleState;
@@ -895,7 +895,7 @@ public class LedgerHandle implements WriteHandle {
if (isHandleWritable()) {
// Ledger handle in read/write mode: submit to OSE for ordered
execution.
- clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
+ executeOrdered(op);
} else {
// Read-only ledger handle: bypass OSE and execute read
directly in client thread.
// This avoids a context-switch to OSE thread and thus reduces
latency.
@@ -1158,7 +1158,7 @@ public class LedgerHandle implements WriteHandle {
if (wasClosed) {
// make sure the callback is triggered in main worker pool
try {
- clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new
SafeRunnable() {
+ executeOrdered(new SafeRunnable() {
@Override
public void safeRun() {
LOG.warn("Force() attempted on a closed ledger: {}",
ledgerId);
@@ -1178,7 +1178,7 @@ public class LedgerHandle implements WriteHandle {
// early exit: no write has been issued yet
if (pendingAddsSequenceHead == INVALID_ENTRY_ID) {
- clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new
SafeRunnable() {
+ executeOrdered(new SafeRunnable() {
@Override
public void safeRun() {
FutureUtils.complete(result, null);
@@ -1193,7 +1193,7 @@ public class LedgerHandle implements WriteHandle {
}
try {
- clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
+ executeOrdered(op);
} catch (RejectedExecutionException e) {
result.completeExceptionally(new
BKException.BKInterruptedException());
}
@@ -1326,7 +1326,7 @@ public class LedgerHandle implements WriteHandle {
if (wasClosed) {
// make sure the callback is triggered in main worker pool
try {
- clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new
SafeRunnable() {
+ executeOrdered(new SafeRunnable() {
@Override
public void safeRun() {
LOG.warn("Attempt to add to closed ledger: {}",
ledgerId);
@@ -1361,7 +1361,7 @@ public class LedgerHandle implements WriteHandle {
}
try {
- clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
+ executeOrdered(op);
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(
BookKeeper.getReturnRc(clientCtx.getBookieClient(),
BKException.Code.InterruptedException),
@@ -2083,4 +2083,14 @@ public class LedgerHandle implements WriteHandle {
return distributionSchedule.getWriteSet(entryId);
}
}
+
+ /**
+ * Execute the callback in the thread pinned to the ledger.
+ * @param runnable
+ * @throws RejectedExecutionException
+ */
+ void executeOrdered(org.apache.bookkeeper.common.util.SafeRunnable
runnable) throws RejectedExecutionException {
+ clientCtx.getMainWorkerPool().executeOrdered(ledgerId, runnable);
+ }
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 3df610c936..242e0c4a97 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.client.impl.OpenBuilderBase;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.OrderedGenericCallback;
+import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -244,7 +245,17 @@ class LedgerOpenOp {
} else {
openOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime),
TimeUnit.NANOSECONDS);
}
- cb.openComplete(rc, lh, ctx);
+
+ if (lh != null) { // lh is null in case of errors
+ lh.executeOrdered(new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ cb.openComplete(rc, lh, ctx);
+ }
+ });
+ } else {
+ cb.openComplete(rc, null, ctx);
+ }
}
static final class OpenBuilderImpl extends OpenBuilderBase {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 1acb8ae8ff..f727a70561 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -732,7 +732,7 @@ public class BookieWriteLedgerTest extends
.withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32)
.withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId)
.execute()
- .thenApply(writer -> { // Add entries to ledger when
created
+ .thenCompose(writer -> { // Add entries to ledger when
created
LOG.info("Writing stream of {} entries to {}",
numEntriesToWrite, ledgerId);
List<ByteBuf> entries =
rng.ints(numEntriesToWrite, 0, maxInt)
@@ -751,8 +751,8 @@ public class BookieWriteLedgerTest extends
ledgerId, entryId,
entry.slice().readInt());
lastRequest = writer.writeAsync(entryId,
entry);
}
- lastRequest.join();
- return Pair.of(writer, entries);
+ return lastRequest
+ .thenApply(___ -> Pair.of(writer,
entries));
});
})
.parallel().map(CompletableFuture::join) // wait for all creations
and adds in parallel
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
index 7a54fa50b3..985ccd38bd 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.OpenBuilderBase;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,9 +106,14 @@ public class MockBookKeeper extends BookKeeper {
log.info("Creating ledger {}", id);
MockLedgerHandle lh = new
MockLedgerHandle(MockBookKeeper.this, id, digestType, passwd);
ledgers.put(id, lh);
- cb.createComplete(0, lh, ctx);
+ lh.executeOrdered(new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ cb.createComplete(0, lh, ctx);
+ }
+ });
} catch (Throwable t) {
- t.printStackTrace();
+ log.error("Error", t);
}
}
});
@@ -162,7 +168,12 @@ public class MockBookKeeper extends BookKeeper {
} else if (!Arrays.equals(lh.passwd, passwd)) {
cb.openComplete(BKException.Code.UnauthorizedAccessException,
null, ctx);
} else {
- cb.openComplete(0, lh, ctx);
+ lh.executeOrdered(new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ cb.openComplete(0, lh, ctx);
+ }
+ });
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
index 0c138a1430..5eab26a696 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -89,6 +89,11 @@ public class MockLedgerHandle extends LedgerHandle {
}
+ @Override
+ void executeOrdered(org.apache.bookkeeper.common.util.SafeRunnable
runnable) throws RejectedExecutionException {
+ bk.executor.execute(runnable);
+ }
+
@Override
public void asyncReadEntries(final long firstEntry, final long lastEntry,
final ReadCallback cb, final Object ctx) {
if (bk.isStopped()) {