codelipenghui commented on code in PR #18245:
URL: https://github.com/apache/pulsar/pull/18245#discussion_r1019032971


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -44,6 +44,8 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
     private long entryId;
     ByteBuf data;
 
+    private Runnable onDellocate;

Review Comment:
   ```suggestion
       private Runnable onDeallocate;
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -63,6 +66,7 @@ public static EntryImpl create(long ledgerId, long entryId, 
byte[] data) {
         entry.entryId = entryId;
         entry.data = Unpooled.wrappedBuffer(data);
         entry.setRefCnt(1);
+        entry.onDellocate = null;

Review Comment:
   We can remove this line, default should be null.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -313,6 +349,74 @@ void asyncReadEntry0(ReadHandle lh, long firstEntry, long 
lastEntry, boolean sho
         }
     }
 
+    private AsyncCallbacks.ReadEntriesCallback 
handlePendingReadsLimits(ReadHandle lh,
+                                                                long 
firstEntry, long lastEntry,
+                                                                boolean 
shouldCacheEntry,
+                                                                
AsyncCallbacks.ReadEntriesCallback originalCallback,
+                                                                Object ctx, 
InflightReadsLimiter.Handle handle) {
+        InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+        if (pendingReadsLimiter.isDisabled()) {
+            return originalCallback;
+        }
+        long estimatedReadSize = (1 + lastEntry - firstEntry)
+                * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        InflightReadsLimiter.Handle newHandle = 
pendingReadsLimiter.acquire(estimatedReadSize, handle);
+        if (!newHandle.success) {
+            long now = System.currentTimeMillis();
+            if (now - newHandle.creationTime > readEntryTimeoutMillis) {
+                String message = "Time-out elapsed while acquiring enough 
permits "
+                        + "on the memory limiter to read from ledger "
+                        + lh.getId()
+                        + ", " + getName()
+                        + ", estimated read size " + estimatedReadSize + " 
bytes"
+                        + " for " + (1 + lastEntry - firstEntry)
+                        + " entries (check 
managedLedgerMaxReadsInFlightSizeInMB)";
+                log.error(message);
+                pendingReadsLimiter.release(newHandle);
+                originalCallback.readEntriesFailed(
+                        new 
ManagedLedgerException.TooManyRequestsException(message), ctx);
+                return null;
+            }
+            ml.getExecutor().submitOrdered(lh.getId(), () -> {
+                asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry,
+                        originalCallback, ctx, newHandle);
+                return null;
+            });
+            return null;
+        } else {
+            callback = new AsyncCallbacks.ReadEntriesCallback() {
+
+                @Override
+                public void readEntriesComplete(List<Entry> entries, Object 
ctx) {
+                    if (!entries.isEmpty()) {
+                        long size = entries.get(0).getLength();
+                        estimatedEntrySize = size;

Review Comment:
   Can we use the `avgMessagesPerEntry` from the consumer?
   The `RangeEntryCacheImpl.java` is shared across all the topics. If 
calculated at the topic level, we should be able to get a more precise 
estimated entry size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to