lhotari commented on code in PR #23901:
URL: https://github.com/apache/pulsar/pull/23901#discussion_r1933485554


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -337,89 +403,36 @@ void asyncReadEntry0WithLimits(ReadHandle lh, long 
firstEntry, long lastEntry, b
 
             manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(), 
totalCachedSize);
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", 
ml.getName(), ledgerId, firstEntry,
-                        lastEntry);
+                log.debug("[{}] Cache hit for {} entries in range {} to {}", 
ml.getName(), numberOfEntries,
+                        firstPosition, lastPosition);
             }
 
-            callback.readEntriesComplete((List) entriesToReturn, ctx);
+            callback.readEntriesComplete(entriesToReturn, ctx);
 
         } else {
             if (!cachedEntries.isEmpty()) {
                 cachedEntries.forEach(entry -> entry.release());
             }
 
             // Read all the entries from bookkeeper
-            pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
+            pendingReadsManager.readEntries(lh, firstPosition.getEntryId(), 
lastPosition.getEntryId(),
                     shouldCacheEntry, callback, ctx);
-
         }
     }
 
-    private AsyncCallbacks.ReadEntriesCallback 
handlePendingReadsLimits(ReadHandle lh,
-                                                                long 
firstEntry, long lastEntry,
-                                                                boolean 
shouldCacheEntry,
-                                                                
AsyncCallbacks.ReadEntriesCallback originalCallback,
-                                                                Object ctx, 
InflightReadsLimiter.Handle handle) {
-        InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
-        if (pendingReadsLimiter.isDisabled()) {
-            return originalCallback;
+    @VisibleForTesting
+    public long getEstimatedEntrySize() {
+        long estimatedEntrySize = getAvgEntrySize();
+        if (estimatedEntrySize == 0) {
+            estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
         }
-        long estimatedReadSize = (1 + lastEntry - firstEntry)
-                * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
-        final AsyncCallbacks.ReadEntriesCallback callback;
-        InflightReadsLimiter.Handle newHandle = 
pendingReadsLimiter.acquire(estimatedReadSize, handle);
-        if (!newHandle.success) {
-            long now = System.currentTimeMillis();
-            if (now - newHandle.creationTime > readEntryTimeoutMillis) {
-                String message = "Time-out elapsed while acquiring enough 
permits "
-                        + "on the memory limiter to read from ledger "
-                        + lh.getId()
-                        + ", " + getName()
-                        + ", estimated read size " + estimatedReadSize + " 
bytes"
-                        + " for " + (1 + lastEntry - firstEntry)
-                        + " entries (check 
managedLedgerMaxReadsInFlightSizeInMB)";
-                log.error(message);
-                pendingReadsLimiter.release(newHandle);
-                originalCallback.readEntriesFailed(
-                        new 
ManagedLedgerException.TooManyRequestsException(message), ctx);
-                return null;
-            }
-            ml.getExecutor().execute(() -> {
-                asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry,
-                        originalCallback, ctx, newHandle, true);
-            });
-            return null;
-        } else {
-            callback = new AsyncCallbacks.ReadEntriesCallback() {
-
-                @Override
-                public void readEntriesComplete(List<Entry> entries, Object 
ctx) {
-                    if (!entries.isEmpty()) {
-                        long size = entries.get(0).getLength();
-                        estimatedEntrySize = size;
-
-                        AtomicInteger remainingCount = new 
AtomicInteger(entries.size());
-                        for (Entry entry : entries) {
-                            ((EntryImpl) entry).onDeallocate(() -> {
-                                if (remainingCount.decrementAndGet() <= 0) {
-                                    pendingReadsLimiter.release(newHandle);
-                                }
-                            });
-                        }
-                    } else {
-                        pendingReadsLimiter.release(newHandle);
-                    }
-                    originalCallback.readEntriesComplete(entries, ctx);
-                }
+        return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+    }
 
-                @Override
-                public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
-                    pendingReadsLimiter.release(newHandle);
-                    originalCallback.readEntriesFailed(exception, ctx);
-                }
-            };
-        }
-        return callback;
+    private long getAvgEntrySize() {

Review Comment:
   Good point about checking the performance impact. LongAdder's `sum()` 
operation needs to aggregate across internal cells, but it's designed to be 
efficient: in low-contention scenarios, LongAdder maintains just a single cell, 
making `sum()` essentially as fast as reading a volatile long.
   
   The multiple cell structure only activates under high thread contention, 
where the performance tradeoff favors reduced contention over a slightly more 
expensive `sum()` operation. The number of cells is bounded by the number of 
CPUs, limiting the maximum aggregation cost.
   
   Found the documentation in the Striped64 base class [source 
code](https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/concurrent/atomic/Striped64.java#L54-L118):
   > ```
   >      * In part because Cells are relatively large, we avoid creating
   >      * them until they are needed.  When there is no contention, all
   >      * updates are made to the base field.  Upon first contention (a
   >      * failed CAS on base update), the table is initialized to size 2.
   >      * The table size is doubled upon further contention until
   >      * reaching the nearest power of two greater than or equal to the
   >      * number of CPUS. Table slots remain empty (null) until they are
   >      * needed.
   > ```
   
   It seems that we should be fine with LongAdder here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to