merlimat commented on a change in pull request #11387:
URL: https://github.com/apache/pulsar/pull/11387#discussion_r693112234
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -252,7 +253,8 @@
protected volatile State state = null;
private final OrderedScheduler scheduledExecutor;
- private final OrderedExecutor executor;
+ private final ScheduledExecutorService pinnedScheduledExecutor;
+ private final Executor pinnedExecutor;
Review comment:
If we are using 2 threads, one with the regular executor (which is more
efficient) and the other for the `pinnedScheduledExecutor`, wouldn't that mean
that we still have more than 1 thread accessing some of the objects?
Would it make sense to use the generic `scheduledExecutor` (just for
deferring purposes) and then jump back into the same `pinnedExecutor`?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2159,7 +2162,7 @@ void notifyCursors() {
break;
}
- executor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
+
pinnedExecutor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
Review comment:
Is this required to be on the same executor?
We're notify multiple cursors that entries are available, this should be
able to progress in parallel.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
##########
@@ -93,7 +93,7 @@ public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
if (!entries.isEmpty()) {
// There were already some entries that were read before, we can
return them
- cursor.ledger.getExecutor().execute(safeRun(() -> {
+ cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {
Review comment:
I think we should be careful in not serializing every cursor into the
managed ledger pinned thread, as it could become a bottleneck where there are
many cursors on a topic.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
##########
@@ -141,8 +141,8 @@ void checkReadCompletion() {
cursor.ledger.startReadOperationOnLedger(nextReadPosition,
OpReadEntry.this);
}
- // Schedule next read in a different thread
- cursor.ledger.getExecutor().execute(safeRun(() -> {
+ // Schedule next read
+ cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {
Review comment:
Other than the consideration that different cursors shouldn't be pinned
on a single thread, the reason for jumping to a different thread here is to
avoid a stack overflow.
When the read is being served from the ML cache, it's coming back from same
thread. There are some conditions in which we ask for next read.
eg. If you ask to read 100 entries and we only got 20 entries from current
ledger, we'll schedule a read for the remaining 80 on next ledger. In some
cases there could be abnormal distributions, like 1 entry per ledger and it
would be chaining all the reads and callback within the same stack.
Therefore, the "jump to a random thread" was introduced to break that chain.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2217,15 +2220,16 @@ private void trimConsumedLedgersInBackground() {
@Override
public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
- executor.executeOrdered(name, safeRun(() ->
internalTrimConsumedLedgers(promise)));
+ pinnedExecutor.execute(safeRun(() ->
internalTrimConsumedLedgers(promise)));
}
public void trimConsumedLedgersInBackground(boolean isTruncate,
CompletableFuture<?> promise) {
- executor.executeOrdered(name, safeRun(() ->
internalTrimLedgers(isTruncate, promise)));
+ pinnedExecutor.execute(safeRun(() -> internalTrimLedgers(isTruncate,
promise)));
}
private void scheduleDeferredTrimming(boolean isTruncate,
CompletableFuture<?> promise) {
- scheduledExecutor.schedule(safeRun(() ->
trimConsumedLedgersInBackground(isTruncate, promise)), 100,
TimeUnit.MILLISECONDS);
+ pinnedScheduledExecutor
Review comment:
Since `trimConsumedLedgersInBackground()` is already jumping on the
`pinnedExecutor`, we shouldn't need to use a specific thread for the scheduled
executor.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2170,7 +2173,7 @@ void notifyWaitingEntryCallBacks() {
break;
}
- executor.execute(safeRun(cb::entriesAvailable));
+ pinnedExecutor.execute(safeRun(cb::entriesAvailable));
Review comment:
Same for this one, it should be same to spread into multiple threads.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -298,7 +300,8 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory,
BookKeeper bookKeeper
this.ledgerMetadata =
LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
this.digestType =
BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
this.scheduledExecutor = scheduledExecutor;
- this.executor = bookKeeper.getMainWorkerPool();
+ this.pinnedScheduledExecutor = scheduledExecutor.chooseThread(name);
+ this.pinnedExecutor =
bookKeeper.getMainWorkerPool().chooseThread(name);
Review comment:
I don't know why we never did this, but this saves a lot of string
hashings too :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]