merlimat commented on a change in pull request #11387:
URL: https://github.com/apache/pulsar/pull/11387#discussion_r693112234



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -252,7 +253,8 @@
     protected volatile State state = null;
 
     private final OrderedScheduler scheduledExecutor;
-    private final OrderedExecutor executor;
+    private final ScheduledExecutorService pinnedScheduledExecutor;
+    private final Executor pinnedExecutor;

Review comment:
       If we are using 2 threads, one with the regular executor (which is more 
efficient) and the other for the `pinnedScheduledExecutor`, wouldn't that mean 
that we still have more than 1 thread accessing some of the objects?
   
   Would it make sense to use the generic `scheduledExecutor` (just for 
deferring purposes) and then jump back into the same `pinnedExecutor`?

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2159,7 +2162,7 @@ void notifyCursors() {
                 break;
             }
 
-            executor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
+            
pinnedExecutor.execute(safeRun(waitingCursor::notifyEntriesAvailable));

Review comment:
       Is this required to be on the same executor? 
   
   We're notify multiple cursors that entries are available, this should be 
able to progress in parallel.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
##########
@@ -93,7 +93,7 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 
         if (!entries.isEmpty()) {
             // There were already some entries that were read before, we can 
return them
-            cursor.ledger.getExecutor().execute(safeRun(() -> {
+            cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {

Review comment:
       I think we should be careful in not serializing every cursor into the 
managed ledger pinned thread, as it could become a bottleneck where there are 
many cursors on a topic.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
##########
@@ -141,8 +141,8 @@ void checkReadCompletion() {
                 cursor.ledger.startReadOperationOnLedger(nextReadPosition, 
OpReadEntry.this);
             }
 
-            // Schedule next read in a different thread
-            cursor.ledger.getExecutor().execute(safeRun(() -> {
+            // Schedule next read
+            cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {

Review comment:
       Other than the consideration that different cursors shouldn't be pinned 
on a single thread, the reason for jumping to a different thread here is to 
avoid a stack overflow. 
   
   When the read is being served from the ML cache, it's coming back from same 
thread. There are some conditions in which we ask for next read. 
   
   eg. If you ask to read 100 entries and we only got 20 entries from current 
ledger, we'll schedule a read for the remaining 80 on next ledger. In some 
cases there could be abnormal distributions, like 1 entry per ledger and it 
would be chaining all the reads and callback within the same stack. 
   
   Therefore, the "jump to a random thread" was introduced to break that chain.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2217,15 +2220,16 @@ private void trimConsumedLedgersInBackground() {
 
     @Override
     public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
-        executor.executeOrdered(name, safeRun(() -> 
internalTrimConsumedLedgers(promise)));
+        pinnedExecutor.execute(safeRun(() -> 
internalTrimConsumedLedgers(promise)));
     }
 
     public void trimConsumedLedgersInBackground(boolean isTruncate, 
CompletableFuture<?> promise) {
-        executor.executeOrdered(name, safeRun(() -> 
internalTrimLedgers(isTruncate, promise)));
+        pinnedExecutor.execute(safeRun(() -> internalTrimLedgers(isTruncate, 
promise)));
     }
 
     private void scheduleDeferredTrimming(boolean isTruncate, 
CompletableFuture<?> promise) {
-        scheduledExecutor.schedule(safeRun(() -> 
trimConsumedLedgersInBackground(isTruncate, promise)), 100, 
TimeUnit.MILLISECONDS);
+        pinnedScheduledExecutor

Review comment:
       Since `trimConsumedLedgersInBackground()` is already jumping on the 
`pinnedExecutor`, we shouldn't need to use a specific thread for the scheduled 
executor.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2170,7 +2173,7 @@ void notifyWaitingEntryCallBacks() {
                 break;
             }
 
-            executor.execute(safeRun(cb::entriesAvailable));
+            pinnedExecutor.execute(safeRun(cb::entriesAvailable));

Review comment:
       Same for this one, it should be same to spread into multiple threads.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -298,7 +300,8 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper
         this.ledgerMetadata = 
LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
         this.digestType = 
BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
         this.scheduledExecutor = scheduledExecutor;
-        this.executor = bookKeeper.getMainWorkerPool();
+        this.pinnedScheduledExecutor = scheduledExecutor.chooseThread(name);
+        this.pinnedExecutor = 
bookKeeper.getMainWorkerPool().chooseThread(name);

Review comment:
       I don't know why we never did this, but this saves a lot of string 
hashings too :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to