This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit e7848a5f1f231bc5bd92170dce464d96b7ef32a2 Author: Enrico Olivelli <[email protected]> AuthorDate: Thu Oct 6 14:00:14 2022 +0200 LedgerHandle: do not complete metadata operation on the ZookKeeper/Metadata callback thread (#3516) (cherry picked from commit 21357087f08b57a530c94591121b9803b5a486e5) --- .../apache/bookkeeper/client/LedgerCreateOp.java | 12 ++++++++++- .../org/apache/bookkeeper/client/LedgerHandle.java | 24 +++++++++++++++------- .../org/apache/bookkeeper/client/LedgerOpenOp.java | 13 +++++++++++- .../bookkeeper/client/BookieWriteLedgerTest.java | 6 +++--- .../apache/bookkeeper/client/MockBookKeeper.java | 17 ++++++++++++--- .../apache/bookkeeper/client/MockLedgerHandle.java | 5 +++++ 6 files changed, 62 insertions(+), 15 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java index 7dfdd16161..959664a8f3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java @@ -47,6 +47,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.versioning.Versioned; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -272,7 +273,16 @@ class LedgerCreateOp { } else { createOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } - cb.createComplete(rc, lh, ctx); + if (lh != null) { // lh is null in case of errors + lh.executeOrdered(new SafeRunnable() { + @Override + public void safeRun() { + cb.createComplete(rc, lh, ctx); + } + }); + } else { + cb.createComplete(rc, null, ctx); + } } public static class CreateBuilderImpl implements CreateBuilder { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 50fc2d6624..1d55bb32ba 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -539,7 +539,7 @@ public class LedgerHandle implements WriteHandle { * @param rc */ void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) { - clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() { + executeOrdered(new SafeRunnable() { @Override public void safeRun() { final HandleState prevHandleState; @@ -896,7 +896,7 @@ public class LedgerHandle implements WriteHandle { if (isHandleWritable()) { // Ledger handle in read/write mode: submit to OSE for ordered execution. - clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op); + executeOrdered(op); } else { // Read-only ledger handle: bypass OSE and execute read directly in client thread. // This avoids a context-switch to OSE thread and thus reduces latency. @@ -1159,7 +1159,7 @@ public class LedgerHandle implements WriteHandle { if (wasClosed) { // make sure the callback is triggered in main worker pool try { - clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() { + executeOrdered(new SafeRunnable() { @Override public void safeRun() { LOG.warn("Force() attempted on a closed ledger: {}", ledgerId); @@ -1179,7 +1179,7 @@ public class LedgerHandle implements WriteHandle { // early exit: no write has been issued yet if (pendingAddsSequenceHead == INVALID_ENTRY_ID) { - clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() { + executeOrdered(new SafeRunnable() { @Override public void safeRun() { FutureUtils.complete(result, null); @@ -1194,7 +1194,7 @@ public class LedgerHandle implements WriteHandle { } try { - clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op); + executeOrdered(op); } catch (RejectedExecutionException e) { result.completeExceptionally(new BKException.BKInterruptedException()); } @@ -1328,7 +1328,7 @@ public class LedgerHandle implements WriteHandle { if (wasClosed) { // make sure the callback is triggered in main worker pool try { - clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() { + executeOrdered(new SafeRunnable() { @Override public void safeRun() { LOG.warn("Attempt to add to closed ledger: {}", ledgerId); @@ -1363,7 +1363,7 @@ public class LedgerHandle implements WriteHandle { } try { - clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op); + executeOrdered(op); } catch (RejectedExecutionException e) { op.cb.addCompleteWithLatency( BookKeeper.getReturnRc(clientCtx.getBookieClient(), BKException.Code.InterruptedException), @@ -2085,4 +2085,14 @@ public class LedgerHandle implements WriteHandle { return distributionSchedule.getWriteSet(entryId); } } + + /** + * Execute the callback in the thread pinned to the ledger. + * @param runnable + * @throws RejectedExecutionException + */ + void executeOrdered(org.apache.bookkeeper.common.util.SafeRunnable runnable) throws RejectedExecutionException { + clientCtx.getMainWorkerPool().executeOrdered(ledgerId, runnable); + } + } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 0b2be9e64c..6ba4a11e3b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.impl.OpenBuilderBase; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.OrderedGenericCallback; +import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.versioning.Versioned; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -256,7 +257,17 @@ class LedgerOpenOp { } else { openOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } - cb.openComplete(rc, lh, ctx); + + if (lh != null) { // lh is null in case of errors + lh.executeOrdered(new SafeRunnable() { + @Override + public void safeRun() { + cb.openComplete(rc, lh, ctx); + } + }); + } else { + cb.openComplete(rc, null, ctx); + } } static final class OpenBuilderImpl extends OpenBuilderBase { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index fb9f83a6aa..ce2d6f9b64 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -731,7 +731,7 @@ public class BookieWriteLedgerTest extends .withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32) .withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId) .execute() - .thenApply(writer -> { // Add entries to ledger when created + .thenCompose(writer -> { // Add entries to ledger when created LOG.info("Writing stream of {} entries to {}", numEntriesToWrite, ledgerId); List<ByteBuf> entries = rng.ints(numEntriesToWrite, 0, maxInt) @@ -750,8 +750,8 @@ public class BookieWriteLedgerTest extends ledgerId, entryId, entry.slice().readInt()); lastRequest = writer.writeAsync(entryId, entry); } - lastRequest.join(); - return Pair.of(writer, entries); + return lastRequest + .thenApply(___ -> Pair.of(writer, entries)); }); }) .parallel().map(CompletableFuture::join) // wait for all creations and adds in parallel diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java index 75f3d8eeb9..9f5889f3f4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.api.OpenBuilder; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.OpenBuilderBase; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.util.SafeRunnable; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,9 +108,14 @@ public class MockBookKeeper extends BookKeeper { log.info("Creating ledger {}", id); MockLedgerHandle lh = new MockLedgerHandle(MockBookKeeper.this, id, digestType, passwd); ledgers.put(id, lh); - cb.createComplete(0, lh, ctx); + lh.executeOrdered(new SafeRunnable() { + @Override + public void safeRun() { + cb.createComplete(0, lh, ctx); + } + }); } catch (Throwable t) { - t.printStackTrace(); + log.error("Error", t); } } }); @@ -164,7 +170,12 @@ public class MockBookKeeper extends BookKeeper { } else if (!Arrays.equals(lh.passwd, passwd)) { cb.openComplete(BKException.Code.UnauthorizedAccessException, null, ctx); } else { - cb.openComplete(0, lh, ctx); + lh.executeOrdered(new SafeRunnable() { + @Override + public void safeRun() { + cb.openComplete(0, lh, ctx); + } + }); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java index e2b897d841..feca1e8867 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java @@ -92,6 +92,11 @@ public class MockLedgerHandle extends LedgerHandle { } + @Override + void executeOrdered(org.apache.bookkeeper.common.util.SafeRunnable runnable) throws RejectedExecutionException { + bk.executor.execute(runnable); + } + @Override public void asyncReadEntries(final long firstEntry, final long lastEntry, final ReadCallback cb, final Object ctx) { if (bk.isStopped()) {
