This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b6cfecce5f3 [fix][broker Fix bug in RangeCache where different
instance of the key wouldn't ever match (#23903)
b6cfecce5f3 is described below
commit b6cfecce5f3a1eecbf6f5df81cb835fbbfe35980
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 29 01:23:52 2025 +0200
[fix][broker Fix bug in RangeCache where different instance of the key
wouldn't ever match (#23903)
---
.../apache/bookkeeper/mledger/util/RangeCache.java | 109 ++++++--
.../mledger/impl/ManagedLedgerBkTest.java | 115 +++++++-
.../bookkeeper/mledger/util/RangeCacheTest.java | 19 +-
.../api/KeySharedSubscriptionBrokerCacheTest.java | 308 +++++++++++++++++++++
pulsar-broker/src/test/resources/log4j2.xml | 9 +
.../coordinator/impl/TxnLogBufferedWriterTest.java | 9 +
.../coordinator/test/MockedBookKeeperTestCase.java | 6 +-
7 files changed, 543 insertions(+), 32 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index 2f2b161a306..0de6f943622 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -106,7 +106,39 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
return localKey;
}
+ /**
+ * Get the value associated with the key. Returns null if the key does
not match the key.
+ *
+ * @param key the key to match
+ * @return the value associated with the key, or null if the value has
already been recycled or the key does not
+ * match
+ */
V getValue(K key) {
+ return getValueInternal(key, false);
+ }
+
+ /**
+ * Get the value associated with the Map.Entry's key and value. Exact
instance of the key is required to match.
+ * @param entry the entry which contains the key and {@link
EntryWrapper} value to get the value from
+ * @return the value associated with the key, or null if the value has
already been recycled or the key does not
+ * exactly match the same instance
+ */
+ static <K, V> V getValueMatchingMapEntry(Map.Entry<K, EntryWrapper<K,
V>> entry) {
+ return entry.getValue().getValueInternal(entry.getKey(), true);
+ }
+
+ /**
+ * Get the value associated with the key. Returns null if the key does
not match the key associated with the
+ * value.
+ *
+ * @param key the key to match
+ * @param requireSameKeyInstance when true, the matching will be
restricted to exactly the same instance of the
+ * key as the one stored in the wrapper.
This is used to avoid any races
+ * when retrieving or removing the
entries from the cache when the key and value
+ * instances are available.
+ * @return the value associated with the key, or null if the key does
not match
+ */
+ private V getValueInternal(K key, boolean requireSameKeyInstance) {
long stamp = lock.tryOptimisticRead();
K localKey = this.key;
V localValue = this.value;
@@ -116,7 +148,11 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
localValue = this.value;
lock.unlockRead(stamp);
}
- if (localKey != key) {
+ // check that the given key matches the key associated with the
value in the entry
+ // this is used to detect if the entry has already been recycled
and contains another key
+ // when requireSameKeyInstance is true, the key must be exactly
the same instance as the one stored in the
+ // entry to match
+ if (localKey != key && (requireSameKeyInstance || localKey == null
|| !localKey.equals(key))) {
return null;
}
return localValue;
@@ -236,34 +272,45 @@ public class RangeCache<Key extends Comparable<Key>,
Value extends ValueWithKeyV
* The caller is responsible for releasing the reference.
*/
public Value get(Key key) {
- return getValue(key, entries.get(key));
+ return getValueFromWrapper(key, entries.get(key));
}
- private Value getValue(Key key, EntryWrapper<Key, Value> valueWrapper) {
+ private Value getValueFromWrapper(Key key, EntryWrapper<Key, Value>
valueWrapper) {
if (valueWrapper == null) {
return null;
} else {
Value value = valueWrapper.getValue(key);
- if (value == null) {
- // the wrapper has been recycled and contains another key
- return null;
- }
- try {
- value.retain();
- } catch (IllegalReferenceCountException e) {
- // Value was already deallocated
- return null;
- }
- // check that the value matches the key and that there's at least
2 references to it since
- // the cache should be holding one reference and a new reference
was just added in this method
- if (value.refCnt() > 1 && value.matchesKey(key)) {
- return value;
- } else {
- // Value or IdentityWrapper was recycled and already contains
another value
- // release the reference added in this method
- value.release();
- return null;
- }
+ return getRetainedValueMatchingKey(key, value);
+ }
+ }
+
+ private Value getValueMatchingEntry(Map.Entry<Key, EntryWrapper<Key,
Value>> entry) {
+ Value valueMatchingEntry =
EntryWrapper.getValueMatchingMapEntry(entry);
+ return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
+ }
+
+ // validates that the value matches the key and that the value has not
been recycled
+ // which are possible due to the lack of exclusive locks in the cache and
the use of reference counted objects
+ private Value getRetainedValueMatchingKey(Key key, Value value) {
+ if (value == null) {
+ // the wrapper has been recycled and contains another key
+ return null;
+ }
+ try {
+ value.retain();
+ } catch (IllegalReferenceCountException e) {
+ // Value was already deallocated
+ return null;
+ }
+ // check that the value matches the key and that there's at least 2
references to it since
+ // the cache should be holding one reference and a new reference was
just added in this method
+ if (value.refCnt() > 1 && value.matchesKey(key)) {
+ return value;
+ } else {
+ // Value or IdentityWrapper was recycled and already contains
another value
+ // release the reference added in this method
+ value.release();
+ return null;
}
}
@@ -280,7 +327,7 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
// Return the values of the entries found in cache
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry :
entries.subMap(first, true, last, true).entrySet()) {
- Value value = getValue(entry.getKey(), entry.getValue());
+ Value value = getValueMatchingEntry(entry);
if (value != null) {
values.add(value);
}
@@ -297,6 +344,9 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
* @return an pair of ints, containing the number of removed entries and
the total size
*/
public Pair<Integer, Long> removeRange(Key first, Key last, boolean
lastInclusive) {
+ if (log.isDebugEnabled()) {
+ log.debug("Removing entries in range [{}, {}], lastInclusive: {}",
first, last, lastInclusive);
+ }
RemovalCounters counters = RemovalCounters.create();
Map<Key, EntryWrapper<Key, Value>> subMap = entries.subMap(first,
true, last, lastInclusive);
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry :
subMap.entrySet()) {
@@ -320,7 +370,7 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
boolean skipInvalid,
Predicate<Value> removeCondition) {
Key key = entry.getKey();
EntryWrapper<Key, Value> entryWrapper = entry.getValue();
- Value value = entryWrapper.getValue(key);
+ Value value = getValueMatchingEntry(entry);
if (value == null) {
// the wrapper has already been recycled and contains another key
if (!skipInvalid) {
@@ -404,6 +454,9 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
* @return a pair containing the number of entries evicted and their total
size
*/
public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
+ if (log.isDebugEnabled()) {
+ log.debug("Evicting entries to reach a minimum size of {}",
minSize);
+ }
checkArgument(minSize > 0);
RemovalCounters counters = RemovalCounters.create();
while (counters.removedSize < minSize &&
!Thread.currentThread().isInterrupted()) {
@@ -422,6 +475,9 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
* @return the tota
*/
public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
+ if (log.isDebugEnabled()) {
+ log.debug("Evicting entries with timestamp <= {}", maxTimestamp);
+ }
RemovalCounters counters = RemovalCounters.create();
while (!Thread.currentThread().isInterrupted()) {
Map.Entry<Key, EntryWrapper<Key, Value>> entry =
entries.firstEntry();
@@ -453,6 +509,9 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
* @return size of removed entries
*/
public Pair<Integer, Long> clear() {
+ if (log.isDebugEnabled()) {
+ log.debug("Clearing the cache with {} entries and size {}",
entries.size(), size.get());
+ }
RemovalCounters counters = RemovalCounters.create();
while (!Thread.currentThread().isInterrupted()) {
Map.Entry<Key, EntryWrapper<Key, Value>> entry =
entries.firstEntry();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index e23937afea2..574ed2f3251 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -24,24 +24,28 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.Entry;
@@ -53,18 +57,17 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlready
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import io.netty.buffer.ByteBuf;
-import lombok.Cleanup;
-
@Slf4j
public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
@@ -241,6 +244,108 @@ public class ManagedLedgerBkTest extends
BookKeeperClusterTestCase {
assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
}
+ @Test
+ public void verifyAsyncReadEntryUsingCache() throws Exception {
+ ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
+
+ config.setMaxCacheSize(100 * 1024 * 1024);
+ config.setCacheEvictionTimeThresholdMillis(10000);
+ config.setCacheEvictionIntervalMs(10000);
+
+ @Cleanup("shutdown")
+ ManagedLedgerFactoryImpl factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc, config);
+
+ ManagedLedgerConfig conf = new ManagedLedgerConfig();
+ conf.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2)
+ .setRetentionSizeInMB(-1).setRetentionTime(-1,
TimeUnit.MILLISECONDS);
+ final ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("my-ledger" + testName, conf);
+
+ int NumProducers = 5;
+ int NumConsumers = 10;
+
+ final AtomicBoolean done = new AtomicBoolean();
+ final CyclicBarrier barrier = new CyclicBarrier(NumProducers +
NumConsumers + 1);
+
+ List<Future<?>> futures = new ArrayList();
+ List<Position> positions = new CopyOnWriteArrayList<>();
+
+ for (int i = 0; i < NumProducers; i++) {
+ futures.add(executor.submit(() -> {
+ try {
+ // wait for all threads to be ready to start at once
+ barrier.await();
+ while (!done.get()) {
+ Position position =
ledger.addEntry("entry".getBytes());
+ positions.add(position);
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw FutureUtil.wrapToCompletionException(e);
+ }
+ }));
+ }
+
+ // create a dummy cursor since caching happens only when there are
active consumers
+ ManagedCursor cursor = ledger.openCursor("dummy");
+
+ for (int i = 0; i < NumConsumers; i++) {
+ futures.add(executor.submit(() -> {
+ try {
+ // wait for all threads to be ready to start at once
+ barrier.await();
+ while (!done.get()) {
+ if (positions.isEmpty()) {
+ Thread.sleep(1);
+ continue;
+ }
+ // Simulate a replay queue read pattern where
individual entries are read
+ Position randomPosition =
positions.get(ThreadLocalRandom.current().nextInt(positions.size()));
+ // Clone the original instance so that another
instance is used in the asyncReadEntry call
+ // This is to test that keys are compared by .equals
and not by reference under the covers
+ randomPosition =
PositionFactory.create(randomPosition);
+ CompletableFuture<Void> future = new
CompletableFuture<>();
+ ledger.asyncReadEntry(randomPosition, new
AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object
ctx) {
+ entry.release();
+ future.complete(null);
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException
exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ future.get();
+ Thread.sleep(2);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw FutureUtil.wrapToCompletionException(e);
+ }
+ }));
+ }
+
+ // trigger all worker threads at once to continue from the barrier
+ barrier.await();
+
+ int testDurationSeconds = 3;
+ Thread.sleep(testDurationSeconds * 1000);
+
+ done.set(true);
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ factory.getMbean().refreshStats(testDurationSeconds, TimeUnit.SECONDS);
+
+ assertTrue(factory.getMbean().getCacheHitsRate() > 0.0);
+ assertEquals(factory.getMbean().getCacheMissesRate(), 0.0);
+ assertTrue(factory.getMbean().getCacheHitsThroughput() > 0.0);
+ assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
+ }
+
@Test
public void testSimple() throws Exception {
@Cleanup("shutdown")
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
index 4bcf2cc6c4e..aa13d4b8e34 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.util;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -338,4 +339,20 @@ public class RangeCacheTest {
cache.clear();
assertEquals(cache.getNumberOfEntries(), 0);
}
-}
+
+ @Test
+ public void testGetKeyWithDifferentInstance() {
+ RangeCache<Integer, RefString> cache = new RangeCache<>();
+ Integer key = 129;
+ cache.put(key, new RefString("129"));
+ // create a different instance of the key
+ Integer key2 = Integer.valueOf(129);
+ // key and key2 are different instances but they are equal
+ assertNotSame(key, key2);
+ assertEquals(key, key2);
+ // get the value using key2
+ RefString s = cache.get(key2);
+ // the value should be found
+ assertEquals(s.s, "129");
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
new file mode 100644
index 00000000000..9fca95e2e87
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static org.assertj.core.api.SoftAssertions.assertSoftly;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.StickyKeyDispatcher;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.tests.KeySharedImplementationType;
+import org.awaitility.Awaitility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class KeySharedSubscriptionBrokerCacheTest extends ProducerConsumerBase
{
+ private static final Logger log =
LoggerFactory.getLogger(KeySharedSubscriptionBrokerCacheTest.class);
+ private static final String SUBSCRIPTION_NAME = "key_shared";
+ private final KeySharedImplementationType implementationType;
+
+ // Comment out the next line (Factory annotation) to run tests manually in
IntelliJ, one-by-one
+ @Factory
+ public static Object[] createTestInstances() {
+ return
KeySharedImplementationType.generateTestInstances(KeySharedSubscriptionBrokerCacheTest::new);
+ }
+
+ public KeySharedSubscriptionBrokerCacheTest() {
+ // set the default implementation type for manual running in IntelliJ
+ this(KeySharedImplementationType.PIP379);
+ }
+
+ public KeySharedSubscriptionBrokerCacheTest(KeySharedImplementationType
implementationType) {
+ this.implementationType = implementationType;
+ }
+
+ @DataProvider(name = "currentImplementationType")
+ public Object[] currentImplementationType() {
+ return new Object[]{ implementationType };
+ }
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+
conf.setSubscriptionKeySharedUseClassicPersistentImplementation(implementationType.classic);
+
conf.setSubscriptionSharedUseClassicPersistentImplementation(implementationType.classic);
+ conf.setUnblockStuckSubscriptionEnabled(false);
+ conf.setSubscriptionKeySharedUseConsistentHashing(true);
+ conf.setManagedLedgerCacheSizeMB(100);
+
+ // configure to evict entries after 30 seconds so that we can test
retrieval from cache
+ conf.setManagedLedgerCacheEvictionTimeThresholdMillis(30000);
+ conf.setManagedLedgerCacheEvictionIntervalMs(30000);
+
+ // Important: this is currently necessary to make use of cache for
replay queue reads
+ conf.setCacheEvictionByMarkDeletedPosition(true);
+
+ conf.setManagedLedgerMaxReadsInFlightSizeInMB(100);
+ conf.setDispatcherRetryBackoffInitialTimeInMs(0);
+ conf.setDispatcherRetryBackoffMaxTimeInMs(0);
+ conf.setKeySharedUnblockingIntervalMs(0);
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void resetAfterMethod() throws Exception {
+ List<String> list = admin.namespaces().getTopics("public/default");
+ for (String topicName : list){
+ if (!pulsar.getBrokerService().isSystemTopic(topicName)) {
+ admin.topics().delete(topicName, false);
+ }
+ }
+ pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor(null);
+ }
+
+ // Use a fixed seed to make the tests using random values deterministic
+ // When a test fails, it's possible to re-run it to reproduce the issue
+ private static final Random random = new Random(1);
+
+ private Producer<Integer> createProducer(String topic, boolean
enableBatch) throws PulsarClientException {
+ Producer<Integer> producer = null;
+ if (enableBatch) {
+ producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(true)
+ .maxPendingMessages(2001)
+ .batcherBuilder(BatcherBuilder.KEY_BASED)
+ .create();
+ } else {
+ producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .maxPendingMessages(2001)
+ .enableBatching(false)
+ .create();
+ }
+ return producer;
+ }
+
+ private StickyKeyConsumerSelector getSelector(String topic, String
subscription) {
+ return getStickyKeyDispatcher(topic, subscription).getSelector();
+ }
+
+ @SneakyThrows
+ private StickyKeyDispatcher getStickyKeyDispatcher(String topic, String
subscription) {
+ Topic t =
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+ PersistentSubscription sub = (PersistentSubscription)
t.getSubscription(subscription);
+ StickyKeyDispatcher dispatcher = (StickyKeyDispatcher)
sub.getDispatcher();
+ return dispatcher;
+ }
+
+ @Test(dataProvider = "currentImplementationType", invocationCount = 1)
+ public void testReplayQueueReadsGettingCached(KeySharedImplementationType
impl) throws Exception {
+ String topic = newUniqueName("testReplayQueueReadsGettingCached");
+ int numberOfKeys = 100;
+ long pauseTime = 100L;
+
+ @Cleanup
+ Producer<Integer> producer = createProducer(topic, false);
+
+ // create a consumer and close it to create a subscription
+ pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(SUBSCRIPTION_NAME)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe()
+ .close();
+
+ Set<Integer> remainingMessageValues = Collections.synchronizedSet(new
HashSet<>());
+ BlockingQueue<Pair<Consumer<Integer>, Message<Integer>>>
unackedMessages = new LinkedBlockingQueue<>();
+ AtomicBoolean c2MessagesShouldBeUnacked = new AtomicBoolean(true);
+ Set<String> keysForC2 = new HashSet<>();
+ AtomicLong lastMessageTimestamp = new
AtomicLong(System.currentTimeMillis());
+
+ MessageListener<Integer> messageHandler = (consumer, msg) -> {
+ lastMessageTimestamp.set(System.currentTimeMillis());
+ synchronized (this) {
+ String key = msg.getKey();
+ if (c2MessagesShouldBeUnacked.get() &&
keysForC2.contains(key)) {
+ unackedMessages.add(Pair.of(consumer, msg));
+ return;
+ }
+ remainingMessageValues.remove(msg.getValue());
+ consumer.acknowledgeAsync(msg);
+ }
+ };
+
+
pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor((ledgerId,
firstEntry, lastEntry, entries) -> {
+ log.error("Attempting to read from BK when cache should be used.
{}:{} to {}:{}", ledgerId, firstEntry,
+ ledgerId, lastEntry);
+ return CompletableFuture.failedFuture(
+ new ManagedLedgerException.NonRecoverableLedgerException(
+ "Should not read from BK since cache should be
used."));
+ });
+
+ // Adding a new consumer.
+ @Cleanup
+ Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .consumerName("c1")
+ .subscriptionName(SUBSCRIPTION_NAME)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .messageListener(messageHandler)
+ .subscribe();
+
+ @Cleanup
+ Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .consumerName("c2")
+ .subscriptionName(SUBSCRIPTION_NAME)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .messageListener(messageHandler)
+ .subscribe();
+
+ @Cleanup
+ Consumer<Integer> c3 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .consumerName("c3")
+ .subscriptionName(SUBSCRIPTION_NAME)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .messageListener(messageHandler)
+ .subscribe();
+
+ StickyKeyDispatcher dispatcher = getStickyKeyDispatcher(topic,
SUBSCRIPTION_NAME);
+ StickyKeyConsumerSelector selector = dispatcher.getSelector();
+
+ // find keys that will be assigned to c2
+ for (int i = 0; i < numberOfKeys; i++) {
+ String key = String.valueOf(i);
+ byte[] keyBytes = key.getBytes(UTF_8);
+ int hash = selector.makeStickyKeyHash(keyBytes);
+ if (selector.select(hash).consumerName().equals("c2")) {
+ keysForC2.add(key);
+ }
+ }
+
+ // close c2
+ c2.close();
+
+ // produce messages with random keys
+ for (int i = 0; i < 1000; i++) {
+ String key = String.valueOf(random.nextInt(numberOfKeys));
+ //log.info("Producing message with key: {} value: {}", key, i);
+ remainingMessageValues.add(i);
+ producer.newMessage()
+ .key(key)
+ .value(i)
+ .send();
+ }
+
+ // reconnect c2
+ c2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .consumerName("c2")
+ .subscriptionName(SUBSCRIPTION_NAME)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .messageListener(messageHandler)
+ .startPaused(true)
+ .subscribe();
+
+ // ack the unacked messages to unblock c2 keys
+ c2MessagesShouldBeUnacked.set(false);
+ Pair<Consumer<Integer>, Message<Integer>> consumerMessagePair;
+ while ((consumerMessagePair = unackedMessages.poll()) != null) {
+ messageHandler.received(consumerMessagePair.getLeft(),
consumerMessagePair.getRight());
+ }
+
+ // produce more messages with random keys
+ for (int i = 0; i < 1000; i++) {
+ String key = String.valueOf(random.nextInt(numberOfKeys));
+ //log.info("Producing message with key: {} value: {}", key, i);
+ remainingMessageValues.add(i);
+ producer.newMessage()
+ .key(key)
+ .value(i)
+ .send();
+ }
+
+ c2.resume();
+
+ Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> {
+ return remainingMessageValues.isEmpty()
+ || System.currentTimeMillis() - lastMessageTimestamp.get()
> 50 * pauseTime;
+ });
+
+ try {
+ assertSoftly(softly -> {
+
softly.assertThat(remainingMessageValues).as("remainingMessageValues").isEmpty();
+ ManagedLedgerFactoryMXBean cacheStats =
pulsar.getDefaultManagedLedgerFactory().getCacheStats();
+ softly.assertThat(cacheStats.getCacheHitsTotal()).as("cache
hits").isGreaterThan(0);
+ softly.assertThat(cacheStats.getCacheMissesTotal()).as("cache
misses").isEqualTo(0);
+
softly.assertThat(cacheStats.getNumberOfCacheEvictions()).as("cache
evictions").isEqualTo(0);
+ });
+ } finally {
+ logTopicStats(topic);
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/resources/log4j2.xml
b/pulsar-broker/src/test/resources/log4j2.xml
index 7b3cd6a04fc..b348a1d04b7 100644
--- a/pulsar-broker/src/test/resources/log4j2.xml
+++ b/pulsar-broker/src/test/resources/log4j2.xml
@@ -53,5 +53,14 @@
<AppenderRef ref="CONSOLE"/>
</Logger>
-->
+
+ <!-- Uncomment the following loggers for debugging broker cache related
classes
+ <Logger name="org.apache.bookkeeper.mledger.impl.cache" level="DEBUG"
additivity="false">
+ <AppenderRef ref="CONSOLE"/>
+ </Logger>
+ <Logger name="org.apache.bookkeeper.mledger.util.RangeCache" level="DEBUG"
additivity="false">
+ <AppenderRef ref="CONSOLE"/>
+ </Logger>
+ -->
</Loggers>
</Configuration>
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
index 31472794778..f7c343d7421 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -127,6 +128,14 @@ public class TxnLogBufferedWriterTest extends
MockedBookKeeperTestCase {
};
}
+ @Override
+ protected ManagedLedgerFactoryConfig createManagedLedgerFactoryConfig() {
+ ManagedLedgerFactoryConfig managedLedgerFactoryConfig =
super.createManagedLedgerFactoryConfig();
+ // disable the broker cache so that assertAllByteBufHasBeenReleased
can work correctly.
+ managedLedgerFactoryConfig.setMaxCacheSize(0);
+ return managedLedgerFactoryConfig;
+ }
+
/**
* Tests all operations from write to callback, including these step:
* 1. Write many data.
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
index ac5aa3bd892..e3e6945620c 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
@@ -82,10 +82,14 @@ public abstract class MockedBookKeeperTestCase {
throw e;
}
- ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
+ ManagedLedgerFactoryConfig conf = createManagedLedgerFactoryConfig();
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf);
}
+ protected ManagedLedgerFactoryConfig createManagedLedgerFactoryConfig() {
+ return new ManagedLedgerFactoryConfig();
+ }
+
@AfterMethod(alwaysRun = true)
public void tearDown(Method method) {
try {