This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 331a997b76b [fix][broker] Fix repeatedly acquired pending reads quota
(#23869)
331a997b76b is described below
commit 331a997b76b83b3eca777c4559eb60b940d30c27
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jan 28 03:26:32 2025 +0800
[fix][broker] Fix repeatedly acquired pending reads quota (#23869)
---
.../mledger/impl/cache/PendingReadsManager.java | 8 +-
.../mledger/impl/cache/RangeEntryCacheImpl.java | 26 ++-
.../impl/InflightReadsLimiterIntegrationTest.java | 231 +++++++++++++++++++++
.../impl/cache/PendingReadsManagerTest.java | 2 +-
4 files changed, 251 insertions(+), 16 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
index 8b2f3e25f1c..d733b54dd13 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
@@ -362,7 +362,7 @@ public class PendingReadsManager {
};
rangeEntryCache.asyncReadEntry0(lh,
missingOnRight.startEntry,
missingOnRight.endEntry,
- shouldCacheEntry,
readFromRightCallback, null);
+ shouldCacheEntry,
readFromRightCallback, null, false);
}
@Override
@@ -372,7 +372,7 @@ public class PendingReadsManager {
}
};
rangeEntryCache.asyncReadEntry0(lh,
missingOnLeft.startEntry, missingOnLeft.endEntry,
- shouldCacheEntry, readFromLeftCallback,
null);
+ shouldCacheEntry, readFromLeftCallback,
null, false);
} else if (missingOnLeft != null) {
AsyncCallbacks.ReadEntriesCallback
readFromLeftCallback =
new AsyncCallbacks.ReadEntriesCallback() {
@@ -395,7 +395,7 @@ public class PendingReadsManager {
}
};
rangeEntryCache.asyncReadEntry0(lh,
missingOnLeft.startEntry, missingOnLeft.endEntry,
- shouldCacheEntry, readFromLeftCallback,
null);
+ shouldCacheEntry, readFromLeftCallback,
null, false);
} else if (missingOnRight != null) {
AsyncCallbacks.ReadEntriesCallback
readFromRightCallback =
new AsyncCallbacks.ReadEntriesCallback() {
@@ -418,7 +418,7 @@ public class PendingReadsManager {
}
};
rangeEntryCache.asyncReadEntry0(lh,
missingOnRight.startEntry, missingOnRight.endEntry,
- shouldCacheEntry, readFromRightCallback,
null);
+ shouldCacheEntry, readFromRightCallback,
null, false);
}
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index cb006a5f0ce..d52fc8535b5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -57,7 +57,7 @@ public class RangeEntryCacheImpl implements EntryCache {
/**
* Overhead per-entry to take into account the envelope.
*/
- private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
+ public static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
private final RangeEntryCacheManagerImpl manager;
final ManagedLedgerImpl ml;
@@ -102,7 +102,7 @@ public class RangeEntryCacheImpl implements EntryCache {
}
@VisibleForTesting
- InflightReadsLimiter getPendingReadsLimiter() {
+ public InflightReadsLimiter getPendingReadsLimiter() {
return manager.getInflightReadsLimiter();
}
@@ -282,7 +282,7 @@ public class RangeEntryCacheImpl implements EntryCache {
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry,
boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
try {
- asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry,
callback, ctx);
+ asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry,
callback, ctx, true);
} catch (Throwable t) {
log.warn("failed to read entries for {}--{}-{}", lh.getId(),
firstEntry, lastEntry, t);
// invalidate all entries related to ledger from the cache (it
might happen if entry gets corrupt
@@ -295,16 +295,20 @@ public class RangeEntryCacheImpl implements EntryCache {
@SuppressWarnings({ "unchecked", "rawtypes" })
void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry,
boolean shouldCacheEntry,
- final ReadEntriesCallback callback, Object ctx) {
- asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
callback, ctx, null);
+ final ReadEntriesCallback callback, Object ctx, boolean
withLimits) {
+ asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
callback, ctx, null, withLimits);
}
void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long
lastEntry, boolean shouldCacheEntry,
- final ReadEntriesCallback originalCallback, Object ctx,
InflightReadsLimiter.Handle handle) {
-
- final AsyncCallbacks.ReadEntriesCallback callback =
- handlePendingReadsLimits(lh, firstEntry, lastEntry,
shouldCacheEntry,
- originalCallback, ctx, handle);
+ final ReadEntriesCallback originalCallback, Object ctx,
InflightReadsLimiter.Handle handle,
+ boolean withLimits) {
+ AsyncCallbacks.ReadEntriesCallback callback;
+ if (withLimits) {
+ callback = handlePendingReadsLimits(lh, firstEntry, lastEntry,
shouldCacheEntry, originalCallback, ctx,
+ handle);
+ } else {
+ callback = originalCallback;
+ }
if (callback == null) {
return;
}
@@ -382,7 +386,7 @@ public class RangeEntryCacheImpl implements EntryCache {
}
ml.getExecutor().execute(() -> {
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry,
shouldCacheEntry,
- originalCallback, ctx, newHandle);
+ originalCallback, ctx, newHandle, true);
});
return null;
} else {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
new file mode 100644
index 00000000000..b57dea6a5bb
--- /dev/null
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.cache.InflightReadsLimiter;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class InflightReadsLimiterIntegrationTest extends
MockedBookKeeperTestCase {
+
+ @DataProvider
+ public Object[][] readMissingCases() {
+ return new Object[][]{
+ {"missRight"},
+ {"missLeft"},
+ {"bothMiss"}
+ };
+ }
+
+ @Test(dataProvider = "readMissingCases")
+ public void testPreciseLimitation(String missingCase) throws Exception {
+ final long start1 = 50;
+ final long start2 = "missLeft".endsWith(missingCase) ||
"bothMiss".equals(missingCase) ? 30 : 50;
+ final long end1 = 99;
+ final long end2 = "missRight".endsWith(missingCase) ||
"bothMiss".equals(missingCase) ? 109 : 99;
+ final HashSet<Long> secondReadEntries = new HashSet<>();
+ if (start2 < start1) {
+ secondReadEntries.add(start2);
+ }
+ if (end2 > end1) {
+ secondReadEntries.add(end1 + 1);
+ }
+ final int readCount1 = (int) (end1 - start1 + 1);
+ final int readCount2 = (int) (end2 - start2 + 1);
+
+ final DefaultThreadFactory threadFactory = new
DefaultThreadFactory(UUID.randomUUID().toString());
+ final ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(100000);
+ ManagedLedgerFactoryConfig factoryConfig = new
ManagedLedgerFactoryConfig();
+ factoryConfig.setCacheEvictionIntervalMs(3600 * 1000);
+ factoryConfig.setManagedLedgerMaxReadsInFlightSize(1000_000);
+ final ManagedLedgerFactoryImpl factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig);
+ final ManagedLedgerImpl ml = (ManagedLedgerImpl)
factory.open("my_test_ledger", config);
+ final RangeEntryCacheImpl entryCache = (RangeEntryCacheImpl)
ml.entryCache;
+ final RangeEntryCacheManagerImpl rangeEntryCacheManager =
+ (RangeEntryCacheManagerImpl) factory.getEntryCacheManager();
+ final InflightReadsLimiter limiter =
rangeEntryCacheManager.getInflightReadsLimiter();
+ final long totalCapacity =limiter.getRemainingBytes();
+ // final ManagedCursorImpl c1 = (ManagedCursorImpl)
ml.openCursor("c1");
+ for (byte i = 1; i < 127; i++) {
+ log.info("add entry: " + i);
+ ml.addEntry(new byte[]{i});
+ }
+ // Evict cached entries.
+ entryCache.evictEntries(ml.currentLedgerSize);
+ Assert.assertEquals(entryCache.getSize(), 0);
+
+ CountDownLatch readCompleteSignal1 = new CountDownLatch(1);
+ CountDownLatch readCompleteSignal2 = new CountDownLatch(1);
+ CountDownLatch firstReadingStarted = new CountDownLatch(1);
+ LedgerHandle currentLedger = ml.currentLedger;
+ LedgerHandle spyCurrentLedger = Mockito.spy(currentLedger);
+ ml.currentLedger = spyCurrentLedger;
+ Answer answer = invocation -> {
+ long firstEntry = (long) invocation.getArguments()[0];
+ log.info("reading entry: {}", firstEntry);
+ if (firstEntry == start1) {
+ // Wait 3s to make
+ firstReadingStarted.countDown();
+ readCompleteSignal1.await();
+ Object res = invocation.callRealMethod();
+ return res;
+ } else if(secondReadEntries.contains(firstEntry)) {
+ final CompletableFuture res = new CompletableFuture<>();
+ threadFactory.newThread(() -> {
+ try {
+ readCompleteSignal2.await();
+ CompletableFuture<LedgerEntries> future =
+ (CompletableFuture<LedgerEntries>)
invocation.callRealMethod();
+ future.thenAccept(v -> {
+ res.complete(v);
+ }).exceptionally(ex -> {
+ res.completeExceptionally(ex);
+ return null;
+ });
+ } catch (Throwable ex) {
+ res.completeExceptionally(ex);
+ }
+ }).start();
+ return res;
+ } else {
+ return invocation.callRealMethod();
+ }
+ };
+ doAnswer(answer).when(spyCurrentLedger).readAsync(anyLong(),
anyLong());
+
doAnswer(answer).when(spyCurrentLedger).readUnconfirmedAsync(anyLong(),
anyLong());
+
+ // Initialize "entryCache.estimatedEntrySize" to the correct value.
+ Object ctx = new Object();
+ SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
+ entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
+ cb0.entries.join();
+ Long sizePerEntry1 = WhiteboxImpl.getInternalState(entryCache,
"estimatedEntrySize");
+ Assert.assertEquals(sizePerEntry1, 1);
+ Awaitility.await().untilAsserted(() -> {
+ long remainingBytes =limiter.getRemainingBytes();
+ Assert.assertEquals(remainingBytes, totalCapacity);
+ });
+ log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
+
+ // Concurrency reading.
+
+ SimpleReadEntriesCallback cb1 = new SimpleReadEntriesCallback();
+ SimpleReadEntriesCallback cb2 = new SimpleReadEntriesCallback();
+ threadFactory.newThread(() -> {
+ entryCache.asyncReadEntry(spyCurrentLedger, start1, end1, true,
cb1, ctx);
+ }).start();
+ threadFactory.newThread(() -> {
+ try {
+ firstReadingStarted.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true,
cb2, ctx);
+ }).start();
+
+ long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1
+ readCount2, 1);
+ long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
+ log.info("acquired : {}", bytesAcquired1);
+ log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
+ Awaitility.await().untilAsserted(() -> {
+ log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
+ Assert.assertEquals(limiter.getRemainingBytes(),
remainingBytesExpected1);
+ });
+
+ // Complete the read1.
+ Thread.sleep(3000);
+ readCompleteSignal1.countDown();
+ cb1.entries.join();
+ Long sizePerEntry2 = WhiteboxImpl.getInternalState(entryCache,
"estimatedEntrySize");
+ Assert.assertEquals(sizePerEntry2, 1);
+ long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2,
1);
+ long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
+ log.info("acquired : {}", bytesAcquired2);
+ log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
+ Awaitility.await().untilAsserted(() -> {
+ log.info("remainingBytes 1: {}", limiter.getRemainingBytes());
+ Assert.assertEquals(limiter.getRemainingBytes(),
remainingBytesExpected2);
+ });
+
+ readCompleteSignal2.countDown();
+ cb2.entries.join();
+ Long sizePerEntry3 = WhiteboxImpl.getInternalState(entryCache,
"estimatedEntrySize");
+ Assert.assertEquals(sizePerEntry3, 1);
+ Awaitility.await().untilAsserted(() -> {
+ long remainingBytes = limiter.getRemainingBytes();
+ log.info("remainingBytes 2: {}", remainingBytes);
+ Assert.assertEquals(remainingBytes, totalCapacity);
+ });
+ // cleanup
+ ml.delete();
+ factory.shutdown();
+ }
+
+ private long calculateBytesSizeBeforeFirstReading(int entriesCount, int
perEntrySize) {
+ return entriesCount * (perEntrySize +
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ }
+
+ class SimpleReadEntriesCallback implements
AsyncCallbacks.ReadEntriesCallback {
+
+ CompletableFuture<List<Byte>> entries = new CompletableFuture<>();
+
+ @Override
+ public void readEntriesComplete(List<Entry> entriesRead, Object ctx) {
+ List<Byte> list = new ArrayList<>(entriesRead.size());
+ for (Entry entry : entriesRead) {
+ byte b = entry.getDataBuffer().readByte();
+ list.add(b);
+ entry.release();
+ }
+ this.entries.complete(list);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object
ctx) {
+ this.entries.completeExceptionally(exception);
+ }
+ }
+}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
index 01976f648ab..383568c17e8 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -108,7 +108,7 @@ public class PendingReadsManagerTest {
return null;
}
}).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(),
- anyBoolean(), any(), any());
+ anyBoolean(), any(), any(), anyBoolean());
lh = mock(ReadHandle.class);
ml = mock(ManagedLedgerImpl.class);