This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e778e8ad0e73131d7947319956b3e1c9b8fa3efe
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Tue Jan 28 03:26:32 2025 +0800

    [fix][broker] Fix repeatedly acquired pending reads quota (#23869)
    
    (cherry picked from commit 331a997b76b83b3eca777c4559eb60b940d30c27)
---
 .../mledger/impl/cache/PendingReadsManager.java    |   8 +-
 .../mledger/impl/cache/RangeEntryCacheImpl.java    |  26 ++-
 .../impl/InflightReadsLimiterIntegrationTest.java  | 231 +++++++++++++++++++++
 .../impl/cache/PendingReadsManagerTest.java        |   2 +-
 4 files changed, 251 insertions(+), 16 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
index 8b2f3e25f1c..d733b54dd13 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
@@ -362,7 +362,7 @@ public class PendingReadsManager {
                                     };
                                     rangeEntryCache.asyncReadEntry0(lh,
                                             missingOnRight.startEntry, 
missingOnRight.endEntry,
-                                            shouldCacheEntry, 
readFromRightCallback, null);
+                                            shouldCacheEntry, 
readFromRightCallback, null, false);
                                 }
 
                                 @Override
@@ -372,7 +372,7 @@ public class PendingReadsManager {
                                 }
                             };
                             rangeEntryCache.asyncReadEntry0(lh, 
missingOnLeft.startEntry, missingOnLeft.endEntry,
-                                    shouldCacheEntry, readFromLeftCallback, 
null);
+                                    shouldCacheEntry, readFromLeftCallback, 
null, false);
                         } else if (missingOnLeft != null) {
                             AsyncCallbacks.ReadEntriesCallback 
readFromLeftCallback =
                                     new AsyncCallbacks.ReadEntriesCallback() {
@@ -395,7 +395,7 @@ public class PendingReadsManager {
                                         }
                                     };
                             rangeEntryCache.asyncReadEntry0(lh, 
missingOnLeft.startEntry, missingOnLeft.endEntry,
-                                    shouldCacheEntry, readFromLeftCallback, 
null);
+                                    shouldCacheEntry, readFromLeftCallback, 
null, false);
                         } else if (missingOnRight != null) {
                             AsyncCallbacks.ReadEntriesCallback 
readFromRightCallback =
                                     new AsyncCallbacks.ReadEntriesCallback() {
@@ -418,7 +418,7 @@ public class PendingReadsManager {
                                         }
                                     };
                             rangeEntryCache.asyncReadEntry0(lh, 
missingOnRight.startEntry, missingOnRight.endEntry,
-                                    shouldCacheEntry, readFromRightCallback, 
null);
+                                    shouldCacheEntry, readFromRightCallback, 
null, false);
                         }
                     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 21eb62e5a8c..f378acfba14 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -56,7 +56,7 @@ public class RangeEntryCacheImpl implements EntryCache {
     /**
      * Overhead per-entry to take into account the envelope.
      */
-    private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
+    public static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
 
     private final RangeEntryCacheManagerImpl manager;
     final ManagedLedgerImpl ml;
@@ -101,7 +101,7 @@ public class RangeEntryCacheImpl implements EntryCache {
     }
 
     @VisibleForTesting
-    InflightReadsLimiter getPendingReadsLimiter() {
+    public InflightReadsLimiter getPendingReadsLimiter() {
         return manager.getInflightReadsLimiter();
     }
 
@@ -281,7 +281,7 @@ public class RangeEntryCacheImpl implements EntryCache {
     public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, 
boolean shouldCacheEntry,
             final ReadEntriesCallback callback, Object ctx) {
         try {
-            asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, 
callback, ctx);
+            asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, 
callback, ctx, true);
         } catch (Throwable t) {
             log.warn("failed to read entries for {}--{}-{}", lh.getId(), 
firstEntry, lastEntry, t);
             // invalidate all entries related to ledger from the cache (it 
might happen if entry gets corrupt
@@ -294,16 +294,20 @@ public class RangeEntryCacheImpl implements EntryCache {
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, 
boolean shouldCacheEntry,
-            final ReadEntriesCallback callback, Object ctx) {
-        asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, 
callback, ctx, null);
+            final ReadEntriesCallback callback, Object ctx, boolean 
withLimits) {
+        asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, 
callback, ctx, null, withLimits);
     }
 
     void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long 
lastEntry, boolean shouldCacheEntry,
-        final ReadEntriesCallback originalCallback, Object ctx, 
InflightReadsLimiter.Handle handle) {
-
-        final AsyncCallbacks.ReadEntriesCallback callback =
-                handlePendingReadsLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry,
-                        originalCallback, ctx, handle);
+        final ReadEntriesCallback originalCallback, Object ctx, 
InflightReadsLimiter.Handle handle,
+                                   boolean withLimits) {
+        AsyncCallbacks.ReadEntriesCallback callback;
+        if (withLimits) {
+            callback = handlePendingReadsLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry, originalCallback, ctx,
+                    handle);
+        } else {
+            callback = originalCallback;
+        }
         if (callback == null) {
             return;
         }
@@ -381,7 +385,7 @@ public class RangeEntryCacheImpl implements EntryCache {
             }
             ml.getExecutor().execute(() -> {
                 asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry,
-                        originalCallback, ctx, newHandle);
+                        originalCallback, ctx, newHandle, true);
             });
             return null;
         } else {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
new file mode 100644
index 00000000000..b57dea6a5bb
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.cache.InflightReadsLimiter;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class InflightReadsLimiterIntegrationTest extends 
MockedBookKeeperTestCase {
+
+    @DataProvider
+    public Object[][] readMissingCases() {
+        return new Object[][]{
+                {"missRight"},
+                {"missLeft"},
+                {"bothMiss"}
+        };
+    }
+
+    @Test(dataProvider = "readMissingCases")
+    public void testPreciseLimitation(String missingCase) throws Exception {
+        final long start1 = 50;
+        final long start2 = "missLeft".endsWith(missingCase) || 
"bothMiss".equals(missingCase) ? 30 : 50;
+        final long end1 = 99;
+        final long end2 = "missRight".endsWith(missingCase) || 
"bothMiss".equals(missingCase) ? 109 : 99;
+        final HashSet<Long> secondReadEntries = new HashSet<>();
+        if (start2 < start1) {
+            secondReadEntries.add(start2);
+        }
+        if (end2 > end1) {
+            secondReadEntries.add(end1 + 1);
+        }
+        final int readCount1 = (int) (end1 - start1 + 1);
+        final int readCount2 = (int) (end2 - start2 + 1);
+
+        final DefaultThreadFactory threadFactory = new 
DefaultThreadFactory(UUID.randomUUID().toString());
+        final ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(100000);
+        ManagedLedgerFactoryConfig factoryConfig = new 
ManagedLedgerFactoryConfig();
+        factoryConfig.setCacheEvictionIntervalMs(3600 * 1000);
+        factoryConfig.setManagedLedgerMaxReadsInFlightSize(1000_000);
+        final ManagedLedgerFactoryImpl factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig);
+        final ManagedLedgerImpl ml = (ManagedLedgerImpl) 
factory.open("my_test_ledger", config);
+        final RangeEntryCacheImpl entryCache = (RangeEntryCacheImpl) 
ml.entryCache;
+        final RangeEntryCacheManagerImpl rangeEntryCacheManager =
+                (RangeEntryCacheManagerImpl) factory.getEntryCacheManager();
+        final InflightReadsLimiter limiter = 
rangeEntryCacheManager.getInflightReadsLimiter();
+        final long totalCapacity =limiter.getRemainingBytes();
+        // final ManagedCursorImpl c1 = (ManagedCursorImpl) 
ml.openCursor("c1");
+        for (byte i = 1; i < 127; i++) {
+            log.info("add entry: " + i);
+            ml.addEntry(new byte[]{i});
+        }
+        // Evict cached entries.
+        entryCache.evictEntries(ml.currentLedgerSize);
+        Assert.assertEquals(entryCache.getSize(), 0);
+
+        CountDownLatch readCompleteSignal1 = new CountDownLatch(1);
+        CountDownLatch readCompleteSignal2 = new CountDownLatch(1);
+        CountDownLatch firstReadingStarted = new CountDownLatch(1);
+        LedgerHandle currentLedger = ml.currentLedger;
+        LedgerHandle spyCurrentLedger = Mockito.spy(currentLedger);
+        ml.currentLedger = spyCurrentLedger;
+        Answer answer = invocation -> {
+            long firstEntry = (long) invocation.getArguments()[0];
+            log.info("reading entry: {}", firstEntry);
+            if (firstEntry == start1) {
+                // Wait 3s to make
+                firstReadingStarted.countDown();
+                readCompleteSignal1.await();
+                Object res = invocation.callRealMethod();
+                return res;
+            } else if(secondReadEntries.contains(firstEntry)) {
+                final CompletableFuture res = new CompletableFuture<>();
+                threadFactory.newThread(() -> {
+                    try {
+                        readCompleteSignal2.await();
+                        CompletableFuture<LedgerEntries> future =
+                                (CompletableFuture<LedgerEntries>) 
invocation.callRealMethod();
+                        future.thenAccept(v -> {
+                            res.complete(v);
+                        }).exceptionally(ex -> {
+                            res.completeExceptionally(ex);
+                            return null;
+                        });
+                    } catch (Throwable ex) {
+                        res.completeExceptionally(ex);
+                    }
+                }).start();
+                return res;
+            } else {
+                return invocation.callRealMethod();
+            }
+        };
+        doAnswer(answer).when(spyCurrentLedger).readAsync(anyLong(), 
anyLong());
+        
doAnswer(answer).when(spyCurrentLedger).readUnconfirmedAsync(anyLong(), 
anyLong());
+
+        // Initialize "entryCache.estimatedEntrySize" to the correct value.
+        Object ctx = new Object();
+        SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
+        entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
+        cb0.entries.join();
+        Long sizePerEntry1 = WhiteboxImpl.getInternalState(entryCache, 
"estimatedEntrySize");
+        Assert.assertEquals(sizePerEntry1, 1);
+        Awaitility.await().untilAsserted(() -> {
+            long remainingBytes =limiter.getRemainingBytes();
+            Assert.assertEquals(remainingBytes, totalCapacity);
+        });
+        log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
+
+        // Concurrency reading.
+
+        SimpleReadEntriesCallback cb1 = new SimpleReadEntriesCallback();
+        SimpleReadEntriesCallback cb2 = new SimpleReadEntriesCallback();
+        threadFactory.newThread(() -> {
+            entryCache.asyncReadEntry(spyCurrentLedger, start1, end1, true, 
cb1, ctx);
+        }).start();
+        threadFactory.newThread(() -> {
+            try {
+                firstReadingStarted.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, 
cb2, ctx);
+        }).start();
+
+        long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 
+ readCount2, 1);
+        long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
+        log.info("acquired : {}", bytesAcquired1);
+        log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
+        Awaitility.await().untilAsserted(() -> {
+            log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
+            Assert.assertEquals(limiter.getRemainingBytes(), 
remainingBytesExpected1);
+        });
+
+        // Complete the read1.
+        Thread.sleep(3000);
+        readCompleteSignal1.countDown();
+        cb1.entries.join();
+        Long sizePerEntry2 = WhiteboxImpl.getInternalState(entryCache, 
"estimatedEntrySize");
+        Assert.assertEquals(sizePerEntry2, 1);
+        long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 
1);
+        long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
+        log.info("acquired : {}", bytesAcquired2);
+        log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
+        Awaitility.await().untilAsserted(() -> {
+            log.info("remainingBytes 1: {}", limiter.getRemainingBytes());
+            Assert.assertEquals(limiter.getRemainingBytes(), 
remainingBytesExpected2);
+        });
+
+        readCompleteSignal2.countDown();
+        cb2.entries.join();
+        Long sizePerEntry3 = WhiteboxImpl.getInternalState(entryCache, 
"estimatedEntrySize");
+        Assert.assertEquals(sizePerEntry3, 1);
+        Awaitility.await().untilAsserted(() -> {
+            long remainingBytes = limiter.getRemainingBytes();
+            log.info("remainingBytes 2: {}", remainingBytes);
+            Assert.assertEquals(remainingBytes, totalCapacity);
+        });
+        // cleanup
+        ml.delete();
+        factory.shutdown();
+    }
+
+    private long calculateBytesSizeBeforeFirstReading(int entriesCount, int 
perEntrySize) {
+        return entriesCount * (perEntrySize + 
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+    }
+
+    class SimpleReadEntriesCallback implements 
AsyncCallbacks.ReadEntriesCallback {
+
+        CompletableFuture<List<Byte>> entries = new CompletableFuture<>();
+
+        @Override
+        public void readEntriesComplete(List<Entry> entriesRead, Object ctx) {
+            List<Byte> list = new ArrayList<>(entriesRead.size());
+            for (Entry entry : entriesRead) {
+                byte b = entry.getDataBuffer().readByte();
+                list.add(b);
+                entry.release();
+            }
+            this.entries.complete(list);
+        }
+
+        @Override
+        public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
+            this.entries.completeExceptionally(exception);
+        }
+    }
+}
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
index 6f573ff8d75..75e371fa97c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -110,7 +110,7 @@ public class PendingReadsManagerTest  {
                 return null;
             }
         }).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(),
-                anyBoolean(), any(), any());
+                anyBoolean(), any(), any(), anyBoolean());
 
         lh = mock(ReadHandle.class);
         ml = mock(ManagedLedgerImpl.class);

Reply via email to