This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit e7848a5f1f231bc5bd92170dce464d96b7ef32a2
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Oct 6 14:00:14 2022 +0200

    LedgerHandle: do not complete metadata operation on the ZookKeeper/Metadata 
callback thread (#3516)
    
    (cherry picked from commit 21357087f08b57a530c94591121b9803b5a486e5)
---
 .../apache/bookkeeper/client/LedgerCreateOp.java   | 12 ++++++++++-
 .../org/apache/bookkeeper/client/LedgerHandle.java | 24 +++++++++++++++-------
 .../org/apache/bookkeeper/client/LedgerOpenOp.java | 13 +++++++++++-
 .../bookkeeper/client/BookieWriteLedgerTest.java   |  6 +++---
 .../apache/bookkeeper/client/MockBookKeeper.java   | 17 ++++++++++++---
 .../apache/bookkeeper/client/MockLedgerHandle.java |  5 +++++
 6 files changed, 62 insertions(+), 15 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 7dfdd16161..959664a8f3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -47,6 +47,7 @@ import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -272,7 +273,16 @@ class LedgerCreateOp {
         } else {
             
createOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), 
TimeUnit.NANOSECONDS);
         }
-        cb.createComplete(rc, lh, ctx);
+        if (lh != null) { // lh is null in case of errors
+            lh.executeOrdered(new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    cb.createComplete(rc, lh, ctx);
+                }
+            });
+        } else {
+            cb.createComplete(rc, null, ctx);
+        }
     }
 
     public static class CreateBuilderImpl implements CreateBuilder {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 50fc2d6624..1d55bb32ba 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -539,7 +539,7 @@ public class LedgerHandle implements WriteHandle {
      * @param rc
      */
     void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final 
int rc) {
-        clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new 
SafeRunnable() {
+        executeOrdered(new SafeRunnable() {
             @Override
             public void safeRun() {
                 final HandleState prevHandleState;
@@ -896,7 +896,7 @@ public class LedgerHandle implements WriteHandle {
 
             if (isHandleWritable()) {
                 // Ledger handle in read/write mode: submit to OSE for ordered 
execution.
-                clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
+                executeOrdered(op);
             } else {
                 // Read-only ledger handle: bypass OSE and execute read 
directly in client thread.
                 // This avoids a context-switch to OSE thread and thus reduces 
latency.
@@ -1159,7 +1159,7 @@ public class LedgerHandle implements WriteHandle {
         if (wasClosed) {
             // make sure the callback is triggered in main worker pool
             try {
-                clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new 
SafeRunnable() {
+                executeOrdered(new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         LOG.warn("Force() attempted on a closed ledger: {}", 
ledgerId);
@@ -1179,7 +1179,7 @@ public class LedgerHandle implements WriteHandle {
 
         // early exit: no write has been issued yet
         if (pendingAddsSequenceHead == INVALID_ENTRY_ID) {
-            clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new 
SafeRunnable() {
+            executeOrdered(new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         FutureUtils.complete(result, null);
@@ -1194,7 +1194,7 @@ public class LedgerHandle implements WriteHandle {
         }
 
         try {
-            clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
+            executeOrdered(op);
         } catch (RejectedExecutionException e) {
             result.completeExceptionally(new 
BKException.BKInterruptedException());
         }
@@ -1328,7 +1328,7 @@ public class LedgerHandle implements WriteHandle {
         if (wasClosed) {
             // make sure the callback is triggered in main worker pool
             try {
-                clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new 
SafeRunnable() {
+                executeOrdered(new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", 
ledgerId);
@@ -1363,7 +1363,7 @@ public class LedgerHandle implements WriteHandle {
         }
 
         try {
-            clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
+            executeOrdered(op);
         } catch (RejectedExecutionException e) {
             op.cb.addCompleteWithLatency(
                     BookKeeper.getReturnRc(clientCtx.getBookieClient(), 
BKException.Code.InterruptedException),
@@ -2085,4 +2085,14 @@ public class LedgerHandle implements WriteHandle {
             return distributionSchedule.getWriteSet(entryId);
         }
     }
+
+    /**
+     * Execute the callback in the thread pinned to the ledger.
+     * @param runnable
+     * @throws RejectedExecutionException
+     */
+    void executeOrdered(org.apache.bookkeeper.common.util.SafeRunnable 
runnable) throws RejectedExecutionException {
+        clientCtx.getMainWorkerPool().executeOrdered(ledgerId, runnable);
+    }
+
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 0b2be9e64c..6ba4a11e3b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.impl.OpenBuilderBase;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedGenericCallback;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -256,7 +257,17 @@ class LedgerOpenOp {
         } else {
             
openOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), 
TimeUnit.NANOSECONDS);
         }
-        cb.openComplete(rc, lh, ctx);
+
+        if (lh != null) { // lh is null in case of errors
+            lh.executeOrdered(new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    cb.openComplete(rc, lh, ctx);
+                }
+            });
+        } else {
+            cb.openComplete(rc, null, ctx);
+        }
     }
 
     static final class OpenBuilderImpl extends OpenBuilderBase {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index fb9f83a6aa..ce2d6f9b64 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -731,7 +731,7 @@ public class BookieWriteLedgerTest extends
                         
.withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32)
                         
.withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId)
                         .execute()
-                        .thenApply(writer -> { // Add entries to ledger when 
created
+                        .thenCompose(writer -> { // Add entries to ledger when 
created
                                 LOG.info("Writing stream of {} entries to {}",
                                          numEntriesToWrite, ledgerId);
                                 List<ByteBuf> entries = 
rng.ints(numEntriesToWrite, 0, maxInt)
@@ -750,8 +750,8 @@ public class BookieWriteLedgerTest extends
                                              ledgerId, entryId, 
entry.slice().readInt());
                                     lastRequest = writer.writeAsync(entryId, 
entry);
                                 }
-                                lastRequest.join();
-                                return Pair.of(writer, entries);
+                                return lastRequest
+                                        .thenApply(___ -> Pair.of(writer, 
entries));
                             });
                 })
             .parallel().map(CompletableFuture::join) // wait for all creations 
and adds in parallel
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
index 75f3d8eeb9..9f5889f3f4 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.OpenBuilderBase;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,9 +108,14 @@ public class MockBookKeeper extends BookKeeper {
                     log.info("Creating ledger {}", id);
                     MockLedgerHandle lh = new 
MockLedgerHandle(MockBookKeeper.this, id, digestType, passwd);
                     ledgers.put(id, lh);
-                    cb.createComplete(0, lh, ctx);
+                    lh.executeOrdered(new SafeRunnable() {
+                        @Override
+                        public void safeRun() {
+                            cb.createComplete(0, lh, ctx);
+                        }
+                    });
                 } catch (Throwable t) {
-                    t.printStackTrace();
+                    log.error("Error", t);
                 }
             }
         });
@@ -164,7 +170,12 @@ public class MockBookKeeper extends BookKeeper {
         } else if (!Arrays.equals(lh.passwd, passwd)) {
             cb.openComplete(BKException.Code.UnauthorizedAccessException, 
null, ctx);
         } else {
-            cb.openComplete(0, lh, ctx);
+            lh.executeOrdered(new SafeRunnable() {
+                                  @Override
+                                  public void safeRun() {
+                                      cb.openComplete(0, lh, ctx);
+                                  }
+                              });
         }
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
index e2b897d841..feca1e8867 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -92,6 +92,11 @@ public class MockLedgerHandle extends LedgerHandle {
 
     }
 
+    @Override
+    void executeOrdered(org.apache.bookkeeper.common.util.SafeRunnable 
runnable) throws RejectedExecutionException {
+        bk.executor.execute(runnable);
+    }
+
     @Override
     public void asyncReadEntries(final long firstEntry, final long lastEntry, 
final ReadCallback cb, final Object ctx) {
         if (bk.isStopped()) {

Reply via email to