lhotari commented on code in PR #23983:
URL: https://github.com/apache/pulsar/pull/23983#discussion_r1957923960


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -798,34 +798,30 @@ public void asyncAddEntry(ByteBuf buffer, int 
numberOfMessages, AddEntryCallback
             log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
         }
 
-        // retain buffer in this thread
+        // The buffer will be queued in `pendingAddEntries` and might be 
polled later in a different thread. However,
+        // the caller could release it after this method returns. To ensure 
the buffer is not released when it's polled,
+        // increase the reference count, which should be decreased by 
`OpAddEntry`'s methods later.
         buffer.retain();
-
-        // Jump to specific thread to avoid contention from writers writing 
from different threads
         final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, 
numberOfMessages, callback, ctx,
                 currentLedgerTimeoutTriggered);
         var added = false;
         try {
-            // Use synchronized to ensure if `addOperation` is added to queue 
and fails later, it will be the first
-            // element in `pendingAddEntries`.
-            synchronized (this) {
-                if (managedLedgerInterceptor != null) {
-                    managedLedgerInterceptor.beforeAddEntry(addOperation, 
addOperation.getNumberOfMessages());
-                }
-                final var state = STATE_UPDATER.get(this);
-                beforeAddEntryToQueue(state);
-                pendingAddEntries.add(addOperation);
-                added = true;
-                afterAddEntryToQueue(state, addOperation);
+            if (managedLedgerInterceptor != null) {
+                managedLedgerInterceptor.beforeAddEntry(addOperation, 
addOperation.getNumberOfMessages());
             }
+            beforeAddEntryToQueue();
+            pendingAddEntries.add(addOperation);
+            added = true;
+            afterAddEntryToQueue(addOperation);
         } catch (Throwable throwable) {
             if (!added) {
                 
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
             } // else: all elements of `pendingAddEntries` will fail in 
another thread
         }
     }
 
-    protected void beforeAddEntryToQueue(State state) throws 
ManagedLedgerException {
+    protected void beforeAddEntryToQueue() throws ManagedLedgerException {
+        final var state = STATE_UPDATER.get(this);

Review Comment:
   using the STATE_UPDATER doesn't make any difference for plain reads of the 
field value.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -836,7 +832,9 @@ protected void beforeAddEntryToQueue(State state) throws 
ManagedLedgerException
         }
     }
 
-    protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) 
throws ManagedLedgerException {
+    // TODO: does this method really need to be synchronized?
+    protected synchronized void afterAddEntryToQueue(OpAddEntry addOperation) 
throws ManagedLedgerException {
+        final var state = STATE_UPDATER.get(this);

Review Comment:
   using the STATE_UPDATER doesn't make any difference for plain reads of the 
field value.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -671,8 +671,15 @@ public void updateSubscribeRateLimiter() {
     }
 
     private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext 
publishContext) {
-        ledger.asyncAddEntry(headersAndPayload,
-            (int) publishContext.getNumberOfMessages(), this, publishContext);
+        // Retain the buffer in advance to avoid the buffer might have been 
released when it's passed to `asyncAddEntry`
+        final var buffer = headersAndPayload.retain();
+        try {
+            ledger.getExecutor().execute(() -> ledger.asyncAddEntry(buffer, 
(int) publishContext.getNumberOfMessages(),
+                    this, publishContext));

Review Comment:
   There are 2 different aspects to consider: thread safety and ordering.
   
   Regarding "I still think the synchronization should be performed from the 
caller":
   In Java, synchronization is not only about performing operations one by one 
under a mutually exclusive lock. "Visibility" is an important aspect of Java 
thread safety. That's why it doesn't make sense for callers to synchronize 
calls to `asyncAddEntry` since all callers would need to use the same lock for 
both ordering and thread safety.
   
   Snippet from "Java Concurrency in Practice", Chapter 2 "Thread safety":
   > It is a common mistake to assume that synchronization needs to be used 
only when writing to shared variables; this is simply not true. 
   > For each mutable state variable that may be accessed by more than one 
thread, all accesses to that variable must be performed **with the same lock 
held**. In this case, we say that the variable is guarded by that lock.
   
   How a ManagedLedger implementation achieves ordering guarantees and thread 
safety is an internal implementation detail. In the case of ManagedLedger, it 
doesn't make sense to delegate the responsibility of thread safety to the 
caller.
   
   Another downside of the `ledger.getExecutor().execute` solution is that it 
exposes internal implementation details that callers of the API must be aware 
of. This is not great API design when such implementation details are exposed.
   
   I think we need to consider a different solution. I can see some code 
examples in #23940 of what needs to be solved. To me, it seems this could be 
solved with the `Object ctx` parameter by passing a `ctx` that is also 
understood by the interceptor. @BewareMyPower, would you be able to research 
that type of solution instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to