This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch InflightReadsLimiter in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eeb80e1092b6d289f1adbfd65b87cecc9b29b900 Author: xiangying <[email protected]> AuthorDate: Fri Sep 15 17:27:32 2023 +0800 [fix][broker][branch-2.10] limit the memory used by reads end-to-end --- .../mledger/impl/cache/InflightReadsLimiter.java | 2 +- .../bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java | 14 +++++++------- .../mledger/impl/cache/InflightReadsLimiterTest.java | 2 +- .../persistent/PersistentDispatcherMultipleConsumers.java | 9 +++------ .../PersistentStreamingDispatcherMultipleConsumers.java | 7 ++----- 5 files changed, 14 insertions(+), 20 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index b946dc09a0c..f3848b6ddd9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 5401ae3563c..3e7401bc512 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -29,7 +29,6 @@ import io.netty.buffer.PooledByteBufAllocator; import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntry; @@ -72,7 +71,6 @@ public class RangeEntryCacheImpl implements EntryCache { public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) { this.manager = manager; this.ml = ml; - this.pendingReadsManager = new PendingReadsManager(this); this.interceptor = ml.getManagedLedgerInterceptor(); this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds(); this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp); @@ -281,14 +279,14 @@ public class RangeEntryCacheImpl implements EntryCache { @SuppressWarnings({ "unchecked", "rawtypes" }) private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final ReadEntriesCallback callback, Object ctx) { - asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null); + asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx, null); } - void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, + void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) { final AsyncCallbacks.ReadEntriesCallback callback = - handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry, + handlePendingReadsLimits(lh, firstEntry, lastEntry, isSlowestReader, originalCallback, ctx, handle); if (callback == null) { return; @@ -371,8 +369,10 @@ public class RangeEntryCacheImpl implements EntryCache { private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, - AsyncCallbacks.ReadEntriesCallback originalCallback, - Object ctx, InflightReadsLimiter.Handle handle) { + AsyncCallbacks.ReadEntriesCallback + originalCallback, + Object ctx, + InflightReadsLimiter.Handle handle) { InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter(); if (pendingReadsLimiter.isDisabled()) { return originalCallback; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java index 2b69581ca2c..e1345129a21 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 86db5ebc734..23f6ca96787 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -527,12 +527,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } long size = entries.stream().mapToLong(Entry::getLength).sum(); updatePendingBytesToDispatch(size); - if (sendMessagesToConsumers(readType, entries)) { - updatePendingBytesToDispatch(-size); - readMoreEntriesAsync(); - } else { - updatePendingBytesToDispatch(-size); - } } + sendMessagesToConsumers(readType, entries); + updatePendingBytesToDispatch(-size); + } protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) { sendInProgress = true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index 2c90e899efc..6f826cfda43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -93,11 +93,8 @@ public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDi .getNextValidPosition((PositionImpl) entry.getPosition())); long size = entry.getLength(); updatePendingBytesToDispatch(size); - if (sendMessagesToConsumers(readType, Lists.newArrayList(entry))) { - readMoreEntriesAsync(); - } else { - updatePendingBytesToDispatch(-size); - } + sendMessagesToConsumers(readType, Lists.newArrayList(entry)); + updatePendingBytesToDispatch(-size); ctx.recycle(); }
