lhotari commented on code in PR #23983:
URL: https://github.com/apache/pulsar/pull/23983#discussion_r1957923960
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -798,34 +798,30 @@ public void asyncAddEntry(ByteBuf buffer, int
numberOfMessages, AddEntryCallback
log.debug("[{}] asyncAddEntry size={} state={}", name,
buffer.readableBytes(), state);
}
- // retain buffer in this thread
+ // The buffer will be queued in `pendingAddEntries` and might be
polled later in a different thread. However,
+ // the caller could release it after this method returns. To ensure
the buffer is not released when it's polled,
+ // increase the reference count, which should be decreased by
`OpAddEntry`'s methods later.
buffer.retain();
-
- // Jump to specific thread to avoid contention from writers writing
from different threads
final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer,
numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
var added = false;
try {
- // Use synchronized to ensure if `addOperation` is added to queue
and fails later, it will be the first
- // element in `pendingAddEntries`.
- synchronized (this) {
- if (managedLedgerInterceptor != null) {
- managedLedgerInterceptor.beforeAddEntry(addOperation,
addOperation.getNumberOfMessages());
- }
- final var state = STATE_UPDATER.get(this);
- beforeAddEntryToQueue(state);
- pendingAddEntries.add(addOperation);
- added = true;
- afterAddEntryToQueue(state, addOperation);
+ if (managedLedgerInterceptor != null) {
+ managedLedgerInterceptor.beforeAddEntry(addOperation,
addOperation.getNumberOfMessages());
}
+ beforeAddEntryToQueue();
+ pendingAddEntries.add(addOperation);
+ added = true;
+ afterAddEntryToQueue(addOperation);
} catch (Throwable throwable) {
if (!added) {
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
} // else: all elements of `pendingAddEntries` will fail in
another thread
}
}
- protected void beforeAddEntryToQueue(State state) throws
ManagedLedgerException {
+ protected void beforeAddEntryToQueue() throws ManagedLedgerException {
+ final var state = STATE_UPDATER.get(this);
Review Comment:
using the STATE_UPDATER doesn't make any difference for plain reads of the
field value.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -836,7 +832,9 @@ protected void beforeAddEntryToQueue(State state) throws
ManagedLedgerException
}
}
- protected void afterAddEntryToQueue(State state, OpAddEntry addOperation)
throws ManagedLedgerException {
+ // TODO: does this method really need to be synchronized?
+ protected synchronized void afterAddEntryToQueue(OpAddEntry addOperation)
throws ManagedLedgerException {
+ final var state = STATE_UPDATER.get(this);
Review Comment:
using the STATE_UPDATER doesn't make any difference for plain reads of the
field value.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -671,8 +671,15 @@ public void updateSubscribeRateLimiter() {
}
private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext
publishContext) {
- ledger.asyncAddEntry(headersAndPayload,
- (int) publishContext.getNumberOfMessages(), this, publishContext);
+ // Retain the buffer in advance to avoid the buffer might have been
released when it's passed to `asyncAddEntry`
+ final var buffer = headersAndPayload.retain();
+ try {
+ ledger.getExecutor().execute(() -> ledger.asyncAddEntry(buffer,
(int) publishContext.getNumberOfMessages(),
+ this, publishContext));
Review Comment:
There are 2 different aspects to consider: thread safety and ordering.
Regarding "I still think the synchronization should be performed from the
caller":
In Java, synchronization is not only about performing operations one by one
under a mutually exclusive lock. "Visibility" is an important aspect of Java
thread safety. That's why it doesn't make sense for callers to synchronize
calls to `asyncAddEntry` since all callers would need to use the same lock for
both ordering and thread safety.
Snippet from "Java Concurrency in Practice", Chapter 2 "Thread safety":
> It is a common mistake to assume that synchronization needs to be used
only when writing to shared variables; this is simply not true.
> For each mutable state variable that may be accessed by more than one
thread, all accesses to that variable must be performed **with the same lock
held**. In this case, we say that the variable is guarded by that lock.
How a ManagedLedger implementation achieves ordering guarantees and thread
safety is an internal implementation detail. In the case of ManagedLedger, it
doesn't make sense to delegate the responsibility of thread safety to the
caller.
Another downside of the `ledger.getExecutor().execute` solution is that it
exposes internal implementation details that callers of the API must be aware
of. This is not great API design when such implementation details are exposed.
I think we need to consider a different solution. I can see some code
examples in #23940 of what needs to be solved. To me, it seems this could be
solved with the `Object ctx` parameter by passing a `ctx` that is also
understood by the interceptor. @BewareMyPower, would you be able to research
that type of solution instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]