This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b6cfecce5f3 [fix][broker Fix bug in RangeCache where different 
instance of the key wouldn't ever match (#23903)
b6cfecce5f3 is described below

commit b6cfecce5f3a1eecbf6f5df81cb835fbbfe35980
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 29 01:23:52 2025 +0200

    [fix][broker Fix bug in RangeCache where different instance of the key 
wouldn't ever match (#23903)
---
 .../apache/bookkeeper/mledger/util/RangeCache.java | 109 ++++++--
 .../mledger/impl/ManagedLedgerBkTest.java          | 115 +++++++-
 .../bookkeeper/mledger/util/RangeCacheTest.java    |  19 +-
 .../api/KeySharedSubscriptionBrokerCacheTest.java  | 308 +++++++++++++++++++++
 pulsar-broker/src/test/resources/log4j2.xml        |   9 +
 .../coordinator/impl/TxnLogBufferedWriterTest.java |   9 +
 .../coordinator/test/MockedBookKeeperTestCase.java |   6 +-
 7 files changed, 543 insertions(+), 32 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index 2f2b161a306..0de6f943622 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -106,7 +106,39 @@ public class RangeCache<Key extends Comparable<Key>, Value 
extends ValueWithKeyV
             return localKey;
         }
 
+        /**
+         * Get the value associated with the key. Returns null if the key does 
not match the key.
+         *
+         * @param key the key to match
+         * @return the value associated with the key, or null if the value has 
already been recycled or the key does not
+         * match
+         */
         V getValue(K key) {
+            return getValueInternal(key, false);
+        }
+
+        /**
+         * Get the value associated with the Map.Entry's key and value. Exact 
instance of the key is required to match.
+         * @param entry the entry which contains the key and {@link 
EntryWrapper} value to get the value from
+         * @return the value associated with the key, or null if the value has 
already been recycled or the key does not
+         * exactly match the same instance
+         */
+        static <K, V> V getValueMatchingMapEntry(Map.Entry<K, EntryWrapper<K, 
V>> entry) {
+            return entry.getValue().getValueInternal(entry.getKey(), true);
+        }
+
+        /**
+         * Get the value associated with the key. Returns null if the key does 
not match the key associated with the
+         * value.
+         *
+         * @param key                    the key to match
+         * @param requireSameKeyInstance when true, the matching will be 
restricted to exactly the same instance of the
+         *                               key as the one stored in the wrapper. 
This is used to avoid any races
+         *                               when retrieving or removing the 
entries from the cache when the key and value
+         *                               instances are available.
+         * @return the value associated with the key, or null if the key does 
not match
+         */
+        private V getValueInternal(K key, boolean requireSameKeyInstance) {
             long stamp = lock.tryOptimisticRead();
             K localKey = this.key;
             V localValue = this.value;
@@ -116,7 +148,11 @@ public class RangeCache<Key extends Comparable<Key>, Value 
extends ValueWithKeyV
                 localValue = this.value;
                 lock.unlockRead(stamp);
             }
-            if (localKey != key) {
+            // check that the given key matches the key associated with the 
value in the entry
+            // this is used to detect if the entry has already been recycled 
and contains another key
+            // when requireSameKeyInstance is true, the key must be exactly 
the same instance as the one stored in the
+            // entry to match
+            if (localKey != key && (requireSameKeyInstance || localKey == null 
|| !localKey.equals(key))) {
                 return null;
             }
             return localValue;
@@ -236,34 +272,45 @@ public class RangeCache<Key extends Comparable<Key>, 
Value extends ValueWithKeyV
      * The caller is responsible for releasing the reference.
      */
     public Value get(Key key) {
-        return getValue(key, entries.get(key));
+        return getValueFromWrapper(key, entries.get(key));
     }
 
-    private  Value getValue(Key key, EntryWrapper<Key, Value> valueWrapper) {
+    private Value getValueFromWrapper(Key key, EntryWrapper<Key, Value> 
valueWrapper) {
         if (valueWrapper == null) {
             return null;
         } else {
             Value value = valueWrapper.getValue(key);
-            if (value == null) {
-                // the wrapper has been recycled and contains another key
-                return null;
-            }
-            try {
-                value.retain();
-            } catch (IllegalReferenceCountException e) {
-                // Value was already deallocated
-                return null;
-            }
-            // check that the value matches the key and that there's at least 
2 references to it since
-            // the cache should be holding one reference and a new reference 
was just added in this method
-            if (value.refCnt() > 1 && value.matchesKey(key)) {
-                return value;
-            } else {
-                // Value or IdentityWrapper was recycled and already contains 
another value
-                // release the reference added in this method
-                value.release();
-                return null;
-            }
+            return getRetainedValueMatchingKey(key, value);
+        }
+    }
+
+    private Value getValueMatchingEntry(Map.Entry<Key, EntryWrapper<Key, 
Value>> entry) {
+        Value valueMatchingEntry = 
EntryWrapper.getValueMatchingMapEntry(entry);
+        return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
+    }
+
+    // validates that the value matches the key and that the value has not 
been recycled
+    // which are possible due to the lack of exclusive locks in the cache and 
the use of reference counted objects
+    private Value getRetainedValueMatchingKey(Key key, Value value) {
+        if (value == null) {
+            // the wrapper has been recycled and contains another key
+            return null;
+        }
+        try {
+            value.retain();
+        } catch (IllegalReferenceCountException e) {
+            // Value was already deallocated
+            return null;
+        }
+        // check that the value matches the key and that there's at least 2 
references to it since
+        // the cache should be holding one reference and a new reference was 
just added in this method
+        if (value.refCnt() > 1 && value.matchesKey(key)) {
+            return value;
+        } else {
+            // Value or IdentityWrapper was recycled and already contains 
another value
+            // release the reference added in this method
+            value.release();
+            return null;
         }
     }
 
@@ -280,7 +327,7 @@ public class RangeCache<Key extends Comparable<Key>, Value 
extends ValueWithKeyV
 
         // Return the values of the entries found in cache
         for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : 
entries.subMap(first, true, last, true).entrySet()) {
-            Value value = getValue(entry.getKey(), entry.getValue());
+            Value value = getValueMatchingEntry(entry);
             if (value != null) {
                 values.add(value);
             }
@@ -297,6 +344,9 @@ public class RangeCache<Key extends Comparable<Key>, Value 
extends ValueWithKeyV
      * @return an pair of ints, containing the number of removed entries and 
the total size
      */
     public Pair<Integer, Long> removeRange(Key first, Key last, boolean 
lastInclusive) {
+        if (log.isDebugEnabled()) {
+            log.debug("Removing entries in range [{}, {}], lastInclusive: {}", 
first, last, lastInclusive);
+        }
         RemovalCounters counters = RemovalCounters.create();
         Map<Key, EntryWrapper<Key, Value>> subMap = entries.subMap(first, 
true, last, lastInclusive);
         for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : 
subMap.entrySet()) {
@@ -320,7 +370,7 @@ public class RangeCache<Key extends Comparable<Key>, Value 
extends ValueWithKeyV
                                           boolean skipInvalid, 
Predicate<Value> removeCondition) {
         Key key = entry.getKey();
         EntryWrapper<Key, Value> entryWrapper = entry.getValue();
-        Value value = entryWrapper.getValue(key);
+        Value value = getValueMatchingEntry(entry);
         if (value == null) {
             // the wrapper has already been recycled and contains another key
             if (!skipInvalid) {
@@ -404,6 +454,9 @@ public class RangeCache<Key extends Comparable<Key>, Value 
extends ValueWithKeyV
      * @return a pair containing the number of entries evicted and their total 
size
      */
     public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("Evicting entries to reach a minimum size of {}", 
minSize);
+        }
         checkArgument(minSize > 0);
         RemovalCounters counters = RemovalCounters.create();
         while (counters.removedSize < minSize && 
!Thread.currentThread().isInterrupted()) {
@@ -422,6 +475,9 @@ public class RangeCache<Key extends Comparable<Key>, Value 
extends ValueWithKeyV
     * @return the tota
     */
    public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
+       if (log.isDebugEnabled()) {
+              log.debug("Evicting entries with timestamp <= {}", maxTimestamp);
+       }
        RemovalCounters counters = RemovalCounters.create();
        while (!Thread.currentThread().isInterrupted()) {
            Map.Entry<Key, EntryWrapper<Key, Value>> entry = 
entries.firstEntry();
@@ -453,6 +509,9 @@ public class RangeCache<Key extends Comparable<Key>, Value 
extends ValueWithKeyV
      * @return size of removed entries
      */
     public Pair<Integer, Long> clear() {
+        if (log.isDebugEnabled()) {
+            log.debug("Clearing the cache with {} entries and size {}", 
entries.size(), size.get());
+        }
         RemovalCounters counters = RemovalCounters.create();
         while (!Thread.currentThread().isInterrupted()) {
             Map.Entry<Key, EntryWrapper<Key, Value>> entry = 
entries.firstEntry();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index e23937afea2..574ed2f3251 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -24,24 +24,28 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperTestClient;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -53,18 +57,17 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlready
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import io.netty.buffer.ByteBuf;
-import lombok.Cleanup;
-
 @Slf4j
 public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
 
@@ -241,6 +244,108 @@ public class ManagedLedgerBkTest extends 
BookKeeperClusterTestCase {
         assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
     }
 
+    @Test
+    public void verifyAsyncReadEntryUsingCache() throws Exception {
+        ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
+
+        config.setMaxCacheSize(100 * 1024 * 1024);
+        config.setCacheEvictionTimeThresholdMillis(10000);
+        config.setCacheEvictionIntervalMs(10000);
+
+        @Cleanup("shutdown")
+        ManagedLedgerFactoryImpl factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc, config);
+
+        ManagedLedgerConfig conf = new ManagedLedgerConfig();
+        conf.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2)
+                .setRetentionSizeInMB(-1).setRetentionTime(-1, 
TimeUnit.MILLISECONDS);
+        final ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("my-ledger" + testName, conf);
+
+        int NumProducers = 5;
+        int NumConsumers = 10;
+
+        final AtomicBoolean done = new AtomicBoolean();
+        final CyclicBarrier barrier = new CyclicBarrier(NumProducers + 
NumConsumers + 1);
+
+        List<Future<?>> futures = new ArrayList();
+        List<Position> positions = new CopyOnWriteArrayList<>();
+
+        for (int i = 0; i < NumProducers; i++) {
+            futures.add(executor.submit(() -> {
+                try {
+                    // wait for all threads to be ready to start at once
+                    barrier.await();
+                    while (!done.get()) {
+                        Position position = 
ledger.addEntry("entry".getBytes());
+                        positions.add(position);
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw FutureUtil.wrapToCompletionException(e);
+                }
+            }));
+        }
+
+        // create a dummy cursor since caching happens only when there are 
active consumers
+        ManagedCursor cursor = ledger.openCursor("dummy");
+
+        for (int i = 0; i < NumConsumers; i++) {
+            futures.add(executor.submit(() -> {
+                try {
+                    // wait for all threads to be ready to start at once
+                    barrier.await();
+                    while (!done.get()) {
+                        if (positions.isEmpty()) {
+                            Thread.sleep(1);
+                            continue;
+                        }
+                        // Simulate a replay queue read pattern where 
individual entries are read
+                        Position randomPosition = 
positions.get(ThreadLocalRandom.current().nextInt(positions.size()));
+                        // Clone the original instance so that another 
instance is used in the asyncReadEntry call
+                        // This is to test that keys are compared by .equals 
and not by reference under the covers
+                        randomPosition = 
PositionFactory.create(randomPosition);
+                        CompletableFuture<Void> future = new 
CompletableFuture<>();
+                        ledger.asyncReadEntry(randomPosition, new 
AsyncCallbacks.ReadEntryCallback() {
+                            @Override
+                            public void readEntryComplete(Entry entry, Object 
ctx) {
+                                entry.release();
+                                future.complete(null);
+                            }
+
+                            @Override
+                            public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
+                                future.completeExceptionally(exception);
+                            }
+                        }, null);
+                        future.get();
+                        Thread.sleep(2);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw FutureUtil.wrapToCompletionException(e);
+                }
+            }));
+        }
+
+        // trigger all worker threads at once to continue from the barrier
+        barrier.await();
+
+        int testDurationSeconds = 3;
+        Thread.sleep(testDurationSeconds * 1000);
+
+        done.set(true);
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        factory.getMbean().refreshStats(testDurationSeconds, TimeUnit.SECONDS);
+
+        assertTrue(factory.getMbean().getCacheHitsRate() > 0.0);
+        assertEquals(factory.getMbean().getCacheMissesRate(), 0.0);
+        assertTrue(factory.getMbean().getCacheHitsThroughput() > 0.0);
+        assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
+    }
+
     @Test
     public void testSimple() throws Exception {
         @Cleanup("shutdown")
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
index 4bcf2cc6c4e..aa13d4b8e34 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.util;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotSame;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -338,4 +339,20 @@ public class RangeCacheTest {
         cache.clear();
         assertEquals(cache.getNumberOfEntries(), 0);
     }
-}
+
+    @Test
+    public void testGetKeyWithDifferentInstance() {
+        RangeCache<Integer, RefString> cache = new RangeCache<>();
+        Integer key = 129;
+        cache.put(key, new RefString("129"));
+        // create a different instance of the key
+        Integer key2 = Integer.valueOf(129);
+        // key and key2 are different instances but they are equal
+        assertNotSame(key, key2);
+        assertEquals(key, key2);
+        // get the value using key2
+        RefString s = cache.get(key2);
+        // the value should be found
+        assertEquals(s.s, "129");
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
new file mode 100644
index 00000000000..9fca95e2e87
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static org.assertj.core.api.SoftAssertions.assertSoftly;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.StickyKeyDispatcher;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.tests.KeySharedImplementationType;
+import org.awaitility.Awaitility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class KeySharedSubscriptionBrokerCacheTest extends ProducerConsumerBase 
{
+    private static final Logger log = 
LoggerFactory.getLogger(KeySharedSubscriptionBrokerCacheTest.class);
+    private static final String SUBSCRIPTION_NAME = "key_shared";
+    private final KeySharedImplementationType implementationType;
+
+    // Comment out the next line (Factory annotation) to run tests manually in 
IntelliJ, one-by-one
+    @Factory
+    public static Object[] createTestInstances() {
+        return 
KeySharedImplementationType.generateTestInstances(KeySharedSubscriptionBrokerCacheTest::new);
+    }
+
+    public KeySharedSubscriptionBrokerCacheTest() {
+        // set the default implementation type for manual running in IntelliJ
+        this(KeySharedImplementationType.PIP379);
+    }
+
+    public KeySharedSubscriptionBrokerCacheTest(KeySharedImplementationType 
implementationType) {
+        this.implementationType = implementationType;
+    }
+
+    @DataProvider(name = "currentImplementationType")
+    public Object[] currentImplementationType() {
+        return new Object[]{ implementationType };
+    }
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        
conf.setSubscriptionKeySharedUseClassicPersistentImplementation(implementationType.classic);
+        
conf.setSubscriptionSharedUseClassicPersistentImplementation(implementationType.classic);
+        conf.setUnblockStuckSubscriptionEnabled(false);
+        conf.setSubscriptionKeySharedUseConsistentHashing(true);
+        conf.setManagedLedgerCacheSizeMB(100);
+
+        // configure to evict entries after 30 seconds so that we can test 
retrieval from cache
+        conf.setManagedLedgerCacheEvictionTimeThresholdMillis(30000);
+        conf.setManagedLedgerCacheEvictionIntervalMs(30000);
+
+        // Important: this is currently necessary to make use of cache for 
replay queue reads
+        conf.setCacheEvictionByMarkDeletedPosition(true);
+
+        conf.setManagedLedgerMaxReadsInFlightSizeInMB(100);
+        conf.setDispatcherRetryBackoffInitialTimeInMs(0);
+        conf.setDispatcherRetryBackoffMaxTimeInMs(0);
+        conf.setKeySharedUnblockingIntervalMs(0);
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void resetAfterMethod() throws Exception {
+        List<String> list = admin.namespaces().getTopics("public/default");
+        for (String topicName : list){
+            if (!pulsar.getBrokerService().isSystemTopic(topicName)) {
+                admin.topics().delete(topicName, false);
+            }
+        }
+        pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor(null);
+    }
+
+    // Use a fixed seed to make the tests using random values deterministic
+    // When a test fails, it's possible to re-run it to reproduce the issue
+    private static final Random random = new Random(1);
+
+    private Producer<Integer> createProducer(String topic, boolean 
enableBatch) throws PulsarClientException {
+        Producer<Integer> producer = null;
+        if (enableBatch) {
+            producer = pulsarClient.newProducer(Schema.INT32)
+                    .topic(topic)
+                    .enableBatching(true)
+                    .maxPendingMessages(2001)
+                    .batcherBuilder(BatcherBuilder.KEY_BASED)
+                    .create();
+        } else {
+            producer = pulsarClient.newProducer(Schema.INT32)
+                    .topic(topic)
+                    .maxPendingMessages(2001)
+                    .enableBatching(false)
+                    .create();
+        }
+        return producer;
+    }
+
+    private StickyKeyConsumerSelector getSelector(String topic, String 
subscription) {
+        return getStickyKeyDispatcher(topic, subscription).getSelector();
+    }
+
+    @SneakyThrows
+    private StickyKeyDispatcher getStickyKeyDispatcher(String topic, String 
subscription) {
+        Topic t = 
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+        PersistentSubscription sub = (PersistentSubscription) 
t.getSubscription(subscription);
+        StickyKeyDispatcher dispatcher = (StickyKeyDispatcher) 
sub.getDispatcher();
+        return dispatcher;
+    }
+
+    @Test(dataProvider = "currentImplementationType", invocationCount = 1)
+    public void testReplayQueueReadsGettingCached(KeySharedImplementationType 
impl) throws Exception {
+        String topic = newUniqueName("testReplayQueueReadsGettingCached");
+        int numberOfKeys = 100;
+        long pauseTime = 100L;
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic, false);
+
+        // create a consumer and close it to create a subscription
+        pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(SUBSCRIPTION_NAME)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe()
+                .close();
+
+        Set<Integer> remainingMessageValues = Collections.synchronizedSet(new 
HashSet<>());
+        BlockingQueue<Pair<Consumer<Integer>, Message<Integer>>> 
unackedMessages = new LinkedBlockingQueue<>();
+        AtomicBoolean c2MessagesShouldBeUnacked = new AtomicBoolean(true);
+        Set<String> keysForC2 = new HashSet<>();
+        AtomicLong lastMessageTimestamp = new 
AtomicLong(System.currentTimeMillis());
+
+        MessageListener<Integer> messageHandler = (consumer, msg) -> {
+            lastMessageTimestamp.set(System.currentTimeMillis());
+            synchronized (this) {
+                String key = msg.getKey();
+                if (c2MessagesShouldBeUnacked.get() && 
keysForC2.contains(key)) {
+                    unackedMessages.add(Pair.of(consumer, msg));
+                    return;
+                }
+                remainingMessageValues.remove(msg.getValue());
+                consumer.acknowledgeAsync(msg);
+            }
+        };
+
+        
pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor((ledgerId, 
firstEntry, lastEntry, entries) -> {
+            log.error("Attempting to read from BK when cache should be used. 
{}:{} to {}:{}", ledgerId, firstEntry,
+                    ledgerId, lastEntry);
+            return CompletableFuture.failedFuture(
+                    new ManagedLedgerException.NonRecoverableLedgerException(
+                            "Should not read from BK since cache should be 
used."));
+        });
+
+        // Adding a new consumer.
+        @Cleanup
+        Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .consumerName("c1")
+                .subscriptionName(SUBSCRIPTION_NAME)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .messageListener(messageHandler)
+                .subscribe();
+
+        @Cleanup
+        Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .consumerName("c2")
+                .subscriptionName(SUBSCRIPTION_NAME)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .messageListener(messageHandler)
+                .subscribe();
+
+        @Cleanup
+        Consumer<Integer> c3 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .consumerName("c3")
+                .subscriptionName(SUBSCRIPTION_NAME)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .messageListener(messageHandler)
+                .subscribe();
+
+        StickyKeyDispatcher dispatcher = getStickyKeyDispatcher(topic, 
SUBSCRIPTION_NAME);
+        StickyKeyConsumerSelector selector = dispatcher.getSelector();
+
+        // find keys that will be assigned to c2
+        for (int i = 0; i < numberOfKeys; i++) {
+            String key = String.valueOf(i);
+            byte[] keyBytes = key.getBytes(UTF_8);
+            int hash = selector.makeStickyKeyHash(keyBytes);
+            if (selector.select(hash).consumerName().equals("c2")) {
+                keysForC2.add(key);
+            }
+        }
+
+        // close c2
+        c2.close();
+
+        // produce messages with random keys
+        for (int i = 0; i < 1000; i++) {
+            String key = String.valueOf(random.nextInt(numberOfKeys));
+            //log.info("Producing message with key: {} value: {}", key, i);
+            remainingMessageValues.add(i);
+            producer.newMessage()
+                    .key(key)
+                    .value(i)
+                    .send();
+        }
+
+        // reconnect c2
+        c2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .consumerName("c2")
+                .subscriptionName(SUBSCRIPTION_NAME)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .messageListener(messageHandler)
+                .startPaused(true)
+                .subscribe();
+
+        // ack the unacked messages to unblock c2 keys
+        c2MessagesShouldBeUnacked.set(false);
+        Pair<Consumer<Integer>, Message<Integer>> consumerMessagePair;
+        while ((consumerMessagePair = unackedMessages.poll()) != null) {
+            messageHandler.received(consumerMessagePair.getLeft(), 
consumerMessagePair.getRight());
+        }
+
+        // produce more messages with random keys
+        for (int i = 0; i < 1000; i++) {
+            String key = String.valueOf(random.nextInt(numberOfKeys));
+            //log.info("Producing message with key: {} value: {}", key, i);
+            remainingMessageValues.add(i);
+            producer.newMessage()
+                    .key(key)
+                    .value(i)
+                    .send();
+        }
+
+        c2.resume();
+
+        Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> {
+            return remainingMessageValues.isEmpty()
+                    || System.currentTimeMillis() - lastMessageTimestamp.get() 
> 50 * pauseTime;
+        });
+
+        try {
+            assertSoftly(softly -> {
+                
softly.assertThat(remainingMessageValues).as("remainingMessageValues").isEmpty();
+                ManagedLedgerFactoryMXBean cacheStats = 
pulsar.getDefaultManagedLedgerFactory().getCacheStats();
+                softly.assertThat(cacheStats.getCacheHitsTotal()).as("cache 
hits").isGreaterThan(0);
+                softly.assertThat(cacheStats.getCacheMissesTotal()).as("cache 
misses").isEqualTo(0);
+                
softly.assertThat(cacheStats.getNumberOfCacheEvictions()).as("cache 
evictions").isEqualTo(0);
+            });
+        } finally {
+            logTopicStats(topic);
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/resources/log4j2.xml 
b/pulsar-broker/src/test/resources/log4j2.xml
index 7b3cd6a04fc..b348a1d04b7 100644
--- a/pulsar-broker/src/test/resources/log4j2.xml
+++ b/pulsar-broker/src/test/resources/log4j2.xml
@@ -53,5 +53,14 @@
       <AppenderRef ref="CONSOLE"/>
     </Logger>
      -->
+
+    <!-- Uncomment the following loggers for debugging broker cache related 
classes
+    <Logger name="org.apache.bookkeeper.mledger.impl.cache" level="DEBUG" 
additivity="false">
+      <AppenderRef ref="CONSOLE"/>
+    </Logger>
+    <Logger name="org.apache.bookkeeper.mledger.util.RangeCache" level="DEBUG" 
additivity="false">
+      <AppenderRef ref="CONSOLE"/>
+    </Logger>
+     -->
   </Loggers>
 </Configuration>
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
index 31472794778..f7c343d7421 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -127,6 +128,14 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
         };
     }
 
+    @Override
+    protected ManagedLedgerFactoryConfig createManagedLedgerFactoryConfig() {
+        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = 
super.createManagedLedgerFactoryConfig();
+        // disable the broker cache so that assertAllByteBufHasBeenReleased 
can work correctly.
+        managedLedgerFactoryConfig.setMaxCacheSize(0);
+        return managedLedgerFactoryConfig;
+    }
+
     /**
      * Tests all operations from write to callback, including these step:
      *   1. Write many data.
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
index ac5aa3bd892..e3e6945620c 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
@@ -82,10 +82,14 @@ public abstract class MockedBookKeeperTestCase {
             throw e;
         }
 
-        ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
+        ManagedLedgerFactoryConfig conf = createManagedLedgerFactoryConfig();
         factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf);
     }
 
+    protected ManagedLedgerFactoryConfig createManagedLedgerFactoryConfig() {
+        return new ManagedLedgerFactoryConfig();
+    }
+
     @AfterMethod(alwaysRun = true)
     public void tearDown(Method method) {
         try {


Reply via email to